mirror of https://github.com/facebook/rocksdb.git
Add support to strip / pad timestamp when creating / reading a block based table (#11495)
Summary: Add support to strip timestamp in block based table builder and pad timestamp in block based table reader. On the write path, use the per column family option `AdvancedColumnFamilyOptions.persist_user_defined_timestamps` to indicate whether user-defined timestamps should be stripped for all block based tables created for the column family. On the read path, added a per table `TableReadOption.user_defined_timestamps_persisted` to flag whether the user keys in the table contains user defined timestamps. This patch is mostly passing the related flags down to the block building/parsing level with the exception of handling the `first_internal_key` in `IndexValue`, which is included in the `IndexBuilder` level. The value part of range deletion entries should have a similar handling, I haven't decided where to best fit this piece of logic, I will do it in a follow up. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11495 Test Plan: Existing test `BlockBasedTableReaderTest` is parameterized to run with: 1) different UDT test modes: kNone, kNormal, kStripUserDefinedTimestamp 2) all four index types, when index type is `kTwoLevelIndexSearch`, also enables partitioned filters 3) parallel vs non-parallel compression 4) enable/disable compression dictionary. Also added tests for API `BlockBasedTableReader::NewIterator`. `PartitionedFilterBlockTest` is parameterized to run with different UDT test modes:kNone, kNormal, kStripUserDefinedTimestamp. ``` make all check ./block_based_table_reader_test ./partitioned_filter_block_test ``` Reviewed By: ltamasi Differential Revision: D46344577 Pulled By: jowlyzhang fbshipit-source-id: 93ac8542b19319d1298712b8bed908c8831ba675
This commit is contained in:
parent
9f1ce6d804
commit
9f7877f246
|
@ -63,7 +63,8 @@ InternalIteratorBase<IndexValue>* BinarySearchIndexReader::NewIterator(
|
|||
auto it = index_block.GetValue()->NewIndexIterator(
|
||||
internal_comparator()->user_comparator(),
|
||||
rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, true,
|
||||
index_has_first_key(), index_key_includes_seq(), index_value_is_full());
|
||||
index_has_first_key(), index_key_includes_seq(), index_value_is_full(),
|
||||
false /* block_contents_pinned */, user_defined_timestamps_persisted());
|
||||
|
||||
assert(it != nullptr);
|
||||
index_block.TransferTo(it);
|
||||
|
|
|
@ -876,7 +876,7 @@ int IndexBlockIter::CompareBlockKey(uint32_t block_index, const Slice& target) {
|
|||
return 1; // Return target is smaller
|
||||
}
|
||||
Slice block_key(key_ptr, non_shared);
|
||||
raw_key_.SetKey(block_key, false /* copy */);
|
||||
UpdateRawKeyAndMaybePadMinTimestamp(block_key);
|
||||
return CompareCurrentKey(target);
|
||||
}
|
||||
|
||||
|
|
|
@ -70,7 +70,8 @@ FilterBlockBuilder* CreateFilterBlockBuilder(
|
|||
const ImmutableCFOptions& /*opt*/, const MutableCFOptions& mopt,
|
||||
const FilterBuildingContext& context,
|
||||
const bool use_delta_encoding_for_index_values,
|
||||
PartitionedIndexBuilder* const p_index_builder) {
|
||||
PartitionedIndexBuilder* const p_index_builder, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps) {
|
||||
const BlockBasedTableOptions& table_opt = context.table_options;
|
||||
assert(table_opt.filter_policy); // precondition
|
||||
|
||||
|
@ -95,7 +96,8 @@ FilterBlockBuilder* CreateFilterBlockBuilder(
|
|||
return new PartitionedFilterBlockBuilder(
|
||||
mopt.prefix_extractor.get(), table_opt.whole_key_filtering,
|
||||
filter_bits_builder, table_opt.index_block_restart_interval,
|
||||
use_delta_encoding_for_index_values, p_index_builder, partition_size);
|
||||
use_delta_encoding_for_index_values, p_index_builder, partition_size,
|
||||
ts_sz, persist_user_defined_timestamps);
|
||||
} else {
|
||||
return new FullFilterBlockBuilder(mopt.prefix_extractor.get(),
|
||||
table_opt.whole_key_filtering,
|
||||
|
@ -264,6 +266,20 @@ struct BlockBasedTableBuilder::Rep {
|
|||
const MutableCFOptions moptions;
|
||||
const BlockBasedTableOptions table_options;
|
||||
const InternalKeyComparator& internal_comparator;
|
||||
// Size in bytes for the user-defined timestamps.
|
||||
size_t ts_sz;
|
||||
// When `ts_sz` > 0 and this flag is false, the user-defined timestamp in the
|
||||
// user key will be stripped when creating the block based table. This
|
||||
// stripping happens for all user keys, including the keys in data block,
|
||||
// index block for data block, index block for index block (if index type is
|
||||
// `kTwoLevelIndexSearch`), index for filter blocks (if using partitioned
|
||||
// filters), the `first_internal_key` in `IndexValue`, the `end_key` for range
|
||||
// deletion entries.
|
||||
// As long as the user keys are sorted when added via `Add` API, their logic
|
||||
// ordering won't change after timestamps are stripped. However, for each user
|
||||
// key to be logically equivalent before and after timestamp is stripped, the
|
||||
// user key should contain the minimum timestamp.
|
||||
bool persist_user_defined_timestamps;
|
||||
WritableFileWriter* file;
|
||||
std::atomic<uint64_t> offset;
|
||||
size_t alignment;
|
||||
|
@ -416,6 +432,9 @@ struct BlockBasedTableBuilder::Rep {
|
|||
moptions(tbo.moptions),
|
||||
table_options(table_opt),
|
||||
internal_comparator(tbo.internal_comparator),
|
||||
ts_sz(tbo.internal_comparator.user_comparator()->timestamp_size()),
|
||||
persist_user_defined_timestamps(
|
||||
tbo.ioptions.persist_user_defined_timestamps),
|
||||
file(f),
|
||||
offset(0),
|
||||
alignment(table_options.block_align
|
||||
|
@ -429,8 +448,14 @@ struct BlockBasedTableBuilder::Rep {
|
|||
->CanKeysWithDifferentByteContentsBeEqual()
|
||||
? BlockBasedTableOptions::kDataBlockBinarySearch
|
||||
: table_options.data_block_index_type,
|
||||
table_options.data_block_hash_table_util_ratio),
|
||||
range_del_block(1 /* block_restart_interval */),
|
||||
table_options.data_block_hash_table_util_ratio, ts_sz,
|
||||
persist_user_defined_timestamps),
|
||||
range_del_block(
|
||||
1 /* block_restart_interval */, true /* use_delta_encoding */,
|
||||
false /* use_value_delta_encoding */,
|
||||
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
|
||||
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
|
||||
persist_user_defined_timestamps),
|
||||
internal_prefix_transform(tbo.moptions.prefix_extractor.get()),
|
||||
compression_type(tbo.compression_type),
|
||||
sample_for_compression(tbo.moptions.sample_for_compression),
|
||||
|
@ -496,13 +521,13 @@ struct BlockBasedTableBuilder::Rep {
|
|||
BlockBasedTableOptions::kTwoLevelIndexSearch) {
|
||||
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
|
||||
&internal_comparator, use_delta_encoding_for_index_values,
|
||||
table_options);
|
||||
table_options, ts_sz, persist_user_defined_timestamps);
|
||||
index_builder.reset(p_index_builder_);
|
||||
} else {
|
||||
index_builder.reset(IndexBuilder::CreateIndexBuilder(
|
||||
table_options.index_type, &internal_comparator,
|
||||
&this->internal_prefix_transform, use_delta_encoding_for_index_values,
|
||||
table_options));
|
||||
table_options, ts_sz, persist_user_defined_timestamps));
|
||||
}
|
||||
if (ioptions.optimize_filters_for_hits && tbo.is_bottommost) {
|
||||
// Apply optimize_filters_for_hits setting here when applicable by
|
||||
|
@ -533,7 +558,8 @@ struct BlockBasedTableBuilder::Rep {
|
|||
|
||||
filter_builder.reset(CreateFilterBlockBuilder(
|
||||
ioptions, moptions, filter_context,
|
||||
use_delta_encoding_for_index_values, p_index_builder_));
|
||||
use_delta_encoding_for_index_values, p_index_builder_, ts_sz,
|
||||
persist_user_defined_timestamps));
|
||||
}
|
||||
|
||||
assert(tbo.int_tbl_prop_collector_factories);
|
||||
|
@ -548,11 +574,10 @@ struct BlockBasedTableBuilder::Rep {
|
|||
new BlockBasedTablePropertiesCollector(
|
||||
table_options.index_type, table_options.whole_key_filtering,
|
||||
moptions.prefix_extractor != nullptr));
|
||||
const Comparator* ucmp = tbo.internal_comparator.user_comparator();
|
||||
assert(ucmp);
|
||||
if (ucmp->timestamp_size() > 0) {
|
||||
if (ts_sz > 0 && persist_user_defined_timestamps) {
|
||||
table_properties_collectors.emplace_back(
|
||||
new TimestampTablePropertiesCollector(ucmp));
|
||||
new TimestampTablePropertiesCollector(
|
||||
tbo.internal_comparator.user_comparator()));
|
||||
}
|
||||
if (table_options.verify_compression) {
|
||||
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
|
||||
|
@ -910,7 +935,9 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
|
|||
// behavior
|
||||
sanitized_table_options.format_version = 1;
|
||||
}
|
||||
|
||||
auto ucmp = tbo.internal_comparator.user_comparator();
|
||||
assert(ucmp);
|
||||
(void)ucmp; // avoids unused variable error.
|
||||
rep_ = new Rep(sanitized_table_options, tbo, file);
|
||||
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
|
@ -994,9 +1021,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
|
|||
r->pc_rep->curr_block_keys->PushBack(key);
|
||||
} else {
|
||||
if (r->filter_builder != nullptr) {
|
||||
size_t ts_sz =
|
||||
r->internal_comparator.user_comparator()->timestamp_size();
|
||||
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
|
||||
r->filter_builder->Add(
|
||||
ExtractUserKeyAndStripTimestamp(key, r->ts_sz));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1017,6 +1043,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
|
|||
r->ioptions.logger);
|
||||
|
||||
} else if (value_type == kTypeRangeDeletion) {
|
||||
// TODO(yuzhangyu): handle range deletion entries for UDT in memtable only.
|
||||
r->range_del_block.Add(key, value);
|
||||
// TODO offset passed in is not accurate for parallel compression case
|
||||
NotifyCollectTableCollectorsOnAdd(key, value, r->get_offset(),
|
||||
|
@ -1028,6 +1055,9 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
|
|||
|
||||
r->props.num_entries++;
|
||||
r->props.raw_key_size += key.size();
|
||||
if (!r->persist_user_defined_timestamps) {
|
||||
r->props.raw_key_size -= r->ts_sz;
|
||||
}
|
||||
r->props.raw_value_size += value.size();
|
||||
if (value_type == kTypeDeletion || value_type == kTypeSingleDeletion ||
|
||||
value_type == kTypeDeletionWithTimestamp) {
|
||||
|
@ -1367,9 +1397,7 @@ void BlockBasedTableBuilder::BGWorkWriteMaybeCompressedBlock() {
|
|||
for (size_t i = 0; i < block_rep->keys->Size(); i++) {
|
||||
auto& key = (*block_rep->keys)[i];
|
||||
if (r->filter_builder != nullptr) {
|
||||
size_t ts_sz =
|
||||
r->internal_comparator.user_comparator()->timestamp_size();
|
||||
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
|
||||
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, r->ts_sz));
|
||||
}
|
||||
r->index_builder->OnKeyAdded(key);
|
||||
}
|
||||
|
@ -1811,7 +1839,9 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
|
|||
|
||||
Block reader{BlockContents{data_block}};
|
||||
DataBlockIter* iter = reader.NewDataIterator(
|
||||
r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber);
|
||||
r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber,
|
||||
nullptr /* iter */, nullptr /* stats */,
|
||||
false /* block_contents_pinned */, r->persist_user_defined_timestamps);
|
||||
|
||||
iter->SeekToFirst();
|
||||
assert(iter->Valid());
|
||||
|
@ -1857,9 +1887,8 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
|
|||
for (; iter->Valid(); iter->Next()) {
|
||||
Slice key = iter->key();
|
||||
if (r->filter_builder != nullptr) {
|
||||
size_t ts_sz =
|
||||
r->internal_comparator.user_comparator()->timestamp_size();
|
||||
r->filter_builder->Add(ExtractUserKeyAndStripTimestamp(key, ts_sz));
|
||||
r->filter_builder->Add(
|
||||
ExtractUserKeyAndStripTimestamp(key, r->ts_sz));
|
||||
}
|
||||
r->index_builder->OnKeyAdded(key);
|
||||
}
|
||||
|
|
|
@ -576,7 +576,8 @@ Status BlockBasedTableFactory::NewTableReader(
|
|||
table_reader_options.block_cache_tracer,
|
||||
table_reader_options.max_file_size_for_l0_meta_pin,
|
||||
table_reader_options.cur_db_session_id, table_reader_options.cur_file_num,
|
||||
table_reader_options.unique_id);
|
||||
table_reader_options.unique_id,
|
||||
table_reader_options.user_defined_timestamps_persisted);
|
||||
}
|
||||
|
||||
TableBuilder* BlockBasedTableFactory::NewTableBuilder(
|
||||
|
|
|
@ -570,7 +570,8 @@ Status BlockBasedTable::Open(
|
|||
TailPrefetchStats* tail_prefetch_stats,
|
||||
BlockCacheTracer* const block_cache_tracer,
|
||||
size_t max_file_size_for_l0_meta_pin, const std::string& cur_db_session_id,
|
||||
uint64_t cur_file_num, UniqueId64x2 expected_unique_id) {
|
||||
uint64_t cur_file_num, UniqueId64x2 expected_unique_id,
|
||||
const bool user_defined_timestamps_persisted) {
|
||||
table_reader->reset();
|
||||
|
||||
Status s;
|
||||
|
@ -631,9 +632,9 @@ Status BlockBasedTable::Open(
|
|||
}
|
||||
|
||||
BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch};
|
||||
Rep* rep = new BlockBasedTable::Rep(ioptions, env_options, table_options,
|
||||
internal_comparator, skip_filters,
|
||||
file_size, level, immortal_table);
|
||||
Rep* rep = new BlockBasedTable::Rep(
|
||||
ioptions, env_options, table_options, internal_comparator, skip_filters,
|
||||
file_size, level, immortal_table, user_defined_timestamps_persisted);
|
||||
rep->file = std::move(file);
|
||||
rep->footer = footer;
|
||||
|
||||
|
@ -763,6 +764,7 @@ Status BlockBasedTable::Open(
|
|||
PersistentCacheOptions(rep->table_options.persistent_cache,
|
||||
rep->base_cache_key, rep->ioptions.stats);
|
||||
|
||||
// TODO(yuzhangyu): handle range deletion entries for UDT in memtable only.
|
||||
s = new_table->ReadRangeDelBlock(ro, prefetch_buffer.get(),
|
||||
metaindex_iter.get(), internal_comparator,
|
||||
&lookup_context);
|
||||
|
@ -1456,7 +1458,8 @@ DataBlockIter* BlockBasedTable::InitBlockIterator<DataBlockIter>(
|
|||
DataBlockIter* input_iter, bool block_contents_pinned) {
|
||||
return block->NewDataIterator(rep->internal_comparator.user_comparator(),
|
||||
rep->get_global_seqno(block_type), input_iter,
|
||||
rep->ioptions.stats, block_contents_pinned);
|
||||
rep->ioptions.stats, block_contents_pinned,
|
||||
rep->user_defined_timestamps_persisted);
|
||||
}
|
||||
|
||||
// TODO?
|
||||
|
@ -1469,7 +1472,7 @@ IndexBlockIter* BlockBasedTable::InitBlockIterator<IndexBlockIter>(
|
|||
rep->get_global_seqno(block_type), input_iter, rep->ioptions.stats,
|
||||
/* total_order_seek */ true, rep->index_has_first_key,
|
||||
rep->index_key_includes_seq, rep->index_value_is_full,
|
||||
block_contents_pinned);
|
||||
block_contents_pinned, rep->user_defined_timestamps_persisted);
|
||||
}
|
||||
|
||||
// If contents is nullptr, this function looks up the block caches for the
|
||||
|
|
|
@ -111,7 +111,8 @@ class BlockBasedTable : public TableReader {
|
|||
BlockCacheTracer* const block_cache_tracer = nullptr,
|
||||
size_t max_file_size_for_l0_meta_pin = 0,
|
||||
const std::string& cur_db_session_id = "", uint64_t cur_file_num = 0,
|
||||
UniqueId64x2 expected_unique_id = {});
|
||||
UniqueId64x2 expected_unique_id = {},
|
||||
const bool user_defined_timestamps_persisted = true);
|
||||
|
||||
bool PrefixRangeMayMatch(const Slice& internal_key,
|
||||
const ReadOptions& read_options,
|
||||
|
@ -549,7 +550,8 @@ struct BlockBasedTable::Rep {
|
|||
Rep(const ImmutableOptions& _ioptions, const EnvOptions& _env_options,
|
||||
const BlockBasedTableOptions& _table_opt,
|
||||
const InternalKeyComparator& _internal_comparator, bool skip_filters,
|
||||
uint64_t _file_size, int _level, const bool _immortal_table)
|
||||
uint64_t _file_size, int _level, const bool _immortal_table,
|
||||
const bool _user_defined_timestamps_persisted = true)
|
||||
: ioptions(_ioptions),
|
||||
env_options(_env_options),
|
||||
table_options(_table_opt),
|
||||
|
@ -562,7 +564,8 @@ struct BlockBasedTable::Rep {
|
|||
global_seqno(kDisableGlobalSequenceNumber),
|
||||
file_size(_file_size),
|
||||
level(_level),
|
||||
immortal_table(_immortal_table) {}
|
||||
immortal_table(_immortal_table),
|
||||
user_defined_timestamps_persisted(_user_defined_timestamps_persisted) {}
|
||||
~Rep() { status.PermitUncheckedError(); }
|
||||
const ImmutableOptions& ioptions;
|
||||
const EnvOptions& env_options;
|
||||
|
@ -635,6 +638,15 @@ struct BlockBasedTable::Rep {
|
|||
bool index_value_is_full = true;
|
||||
|
||||
const bool immortal_table;
|
||||
// Whether the user key contains user-defined timestamps. If this is false and
|
||||
// the running user comparator has a non-zero timestamp size, a min timestamp
|
||||
// of this size will be padded to each user key while parsing blocks whenever
|
||||
// it applies. This includes the keys in data block, index block for data
|
||||
// block, top-level index for index partitions (if index type is
|
||||
// `kTwoLevelIndexSearch`), top-level index for filter partitions (if using
|
||||
// partitioned filters), the `first_internal_key` in `IndexValue`, the
|
||||
// `end_key` for range deletion entries.
|
||||
const bool user_defined_timestamps_persisted;
|
||||
|
||||
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
|
||||
table_reader_cache_res_handle = nullptr;
|
||||
|
|
|
@ -38,8 +38,8 @@ class BlockBasedTableReaderBaseTest : public testing::Test {
|
|||
// complexity: human readable strings are easier to compress than random
|
||||
// strings.
|
||||
static std::map<std::string, std::string> GenerateKVMap(
|
||||
int num_block = 100,
|
||||
bool mixed_with_human_readable_string_value = false) {
|
||||
int num_block = 100, bool mixed_with_human_readable_string_value = false,
|
||||
size_t ts_sz = 0) {
|
||||
std::map<std::string, std::string> kv;
|
||||
|
||||
Random rnd(101);
|
||||
|
@ -58,7 +58,13 @@ class BlockBasedTableReaderBaseTest : public testing::Test {
|
|||
} else {
|
||||
v = rnd.RandomString(256);
|
||||
}
|
||||
kv[std::string(k)] = v;
|
||||
if (ts_sz > 0) {
|
||||
std::string user_key;
|
||||
AppendKeyWithMinTimestamp(&user_key, std::string(k), ts_sz);
|
||||
kv[user_key] = v;
|
||||
} else {
|
||||
kv[std::string(k)] = v;
|
||||
}
|
||||
key++;
|
||||
}
|
||||
}
|
||||
|
@ -80,21 +86,29 @@ class BlockBasedTableReaderBaseTest : public testing::Test {
|
|||
|
||||
// Creates a table with the specificied key value pairs (kv).
|
||||
void CreateTable(const std::string& table_name,
|
||||
const ImmutableOptions& ioptions,
|
||||
const CompressionType& compression_type,
|
||||
const std::map<std::string, std::string>& kv) {
|
||||
const std::map<std::string, std::string>& kv,
|
||||
uint32_t compression_parallel_threads = 1,
|
||||
uint32_t compression_dict_bytes = 0) {
|
||||
std::unique_ptr<WritableFileWriter> writer;
|
||||
NewFileWriter(table_name, &writer);
|
||||
|
||||
// Create table builder.
|
||||
ImmutableOptions ioptions(options_);
|
||||
InternalKeyComparator comparator(options_.comparator);
|
||||
InternalKeyComparator comparator(ioptions.user_comparator);
|
||||
ColumnFamilyOptions cf_options;
|
||||
cf_options.prefix_extractor = options_.prefix_extractor;
|
||||
MutableCFOptions moptions(cf_options);
|
||||
CompressionOptions compression_opts;
|
||||
compression_opts.parallel_threads = compression_parallel_threads;
|
||||
// Enable compression dictionary and set a buffering limit that is the same
|
||||
// as each block's size.
|
||||
compression_opts.max_dict_bytes = compression_dict_bytes;
|
||||
compression_opts.max_dict_buffer_bytes = compression_dict_bytes;
|
||||
IntTblPropCollectorFactories factories;
|
||||
std::unique_ptr<TableBuilder> table_builder(
|
||||
options_.table_factory->NewTableBuilder(
|
||||
TableBuilderOptions(ioptions, moptions, comparator, &factories,
|
||||
compression_type, CompressionOptions(),
|
||||
compression_type, compression_opts,
|
||||
0 /* column_family_id */,
|
||||
kDefaultColumnFamilyName, -1 /* level */),
|
||||
writer.get()));
|
||||
|
@ -114,11 +128,17 @@ class BlockBasedTableReaderBaseTest : public testing::Test {
|
|||
const std::string& table_name,
|
||||
std::unique_ptr<BlockBasedTable>* table,
|
||||
bool prefetch_index_and_filter_in_cache = true,
|
||||
Status* status = nullptr) {
|
||||
Status* status = nullptr,
|
||||
bool user_defined_timestamps_persisted = true) {
|
||||
const MutableCFOptions moptions(options_);
|
||||
TableReaderOptions table_reader_options =
|
||||
TableReaderOptions(ioptions, moptions.prefix_extractor, EnvOptions(),
|
||||
comparator, 0 /* block_protection_bytes_per_key */);
|
||||
TableReaderOptions table_reader_options = TableReaderOptions(
|
||||
ioptions, moptions.prefix_extractor, EnvOptions(), comparator,
|
||||
0 /* block_protection_bytes_per_key */, false /* _skip_filters */,
|
||||
false /* _immortal */, false /* _force_direct_prefetch */,
|
||||
-1 /* _level */, nullptr /* _block_cache_tracer */,
|
||||
0 /* _max_file_size_for_l0_meta_pin */, "" /* _cur_db_session_id */,
|
||||
0 /* _cur_file_num */, {} /* _unique_id */, 0 /* _largest_seqno */,
|
||||
0 /* _tail_size */, user_defined_timestamps_persisted);
|
||||
|
||||
std::unique_ptr<RandomAccessFileReader> file;
|
||||
NewFileReader(table_name, foptions, &file);
|
||||
|
@ -126,9 +146,11 @@ class BlockBasedTableReaderBaseTest : public testing::Test {
|
|||
uint64_t file_size = 0;
|
||||
ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size));
|
||||
|
||||
ReadOptions read_opts;
|
||||
read_opts.verify_checksums = true;
|
||||
std::unique_ptr<TableReader> general_table;
|
||||
Status s = options_.table_factory->NewTableReader(
|
||||
ReadOptions(), table_reader_options, std::move(file), file_size,
|
||||
read_opts, table_reader_options, std::move(file), file_size,
|
||||
&general_table, prefetch_index_and_filter_in_cache);
|
||||
|
||||
if (s.ok()) {
|
||||
|
@ -147,6 +169,11 @@ class BlockBasedTableReaderBaseTest : public testing::Test {
|
|||
std::shared_ptr<FileSystem> fs_;
|
||||
Options options_;
|
||||
|
||||
std::string ToInternalKey(const std::string& key) {
|
||||
InternalKey internal_key(key, 0, ValueType::kTypeValue);
|
||||
return internal_key.Encode().ToString();
|
||||
}
|
||||
|
||||
private:
|
||||
void WriteToFile(const std::string& content, const std::string& filename) {
|
||||
std::unique_ptr<FSWritableFile> f;
|
||||
|
@ -173,21 +200,31 @@ class BlockBasedTableReaderBaseTest : public testing::Test {
|
|||
reader->reset(new RandomAccessFileReader(std::move(f), path,
|
||||
env_->GetSystemClock().get()));
|
||||
}
|
||||
|
||||
std::string ToInternalKey(const std::string& key) {
|
||||
InternalKey internal_key(key, 0, ValueType::kTypeValue);
|
||||
return internal_key.Encode().ToString();
|
||||
}
|
||||
};
|
||||
|
||||
// Param 1: compression type
|
||||
// Param 2: whether to use direct reads
|
||||
// Param 3: Block Based Table Index type
|
||||
// Param 4: BBTO no_block_cache option
|
||||
// Param 5: test mode for the user-defined timestamp feature
|
||||
// Param 6: number of parallel compression threads
|
||||
// Param 7: CompressionOptions.max_dict_bytes and
|
||||
// CompressionOptions.max_dict_buffer_bytes to enable/disable
|
||||
// compression dictionary.
|
||||
class BlockBasedTableReaderTest
|
||||
: public BlockBasedTableReaderBaseTest,
|
||||
public testing::WithParamInterface<std::tuple<
|
||||
CompressionType, bool, BlockBasedTableOptions::IndexType, bool>> {
|
||||
CompressionType, bool, BlockBasedTableOptions::IndexType, bool,
|
||||
test::UserDefinedTimestampTestMode, uint32_t, uint32_t>> {
|
||||
protected:
|
||||
void SetUp() override {
|
||||
compression_type_ = std::get<0>(GetParam());
|
||||
use_direct_reads_ = std::get<1>(GetParam());
|
||||
test::UserDefinedTimestampTestMode udt_test_mode = std::get<4>(GetParam());
|
||||
udt_enabled_ = test::IsUDTEnabled(udt_test_mode);
|
||||
persist_udt_ = test::ShouldPersistUDT(udt_test_mode);
|
||||
compression_parallel_threads_ = std::get<5>(GetParam());
|
||||
compression_dict_bytes_ = std::get<6>(GetParam());
|
||||
BlockBasedTableReaderBaseTest::SetUp();
|
||||
}
|
||||
|
||||
|
@ -195,24 +232,45 @@ class BlockBasedTableReaderTest
|
|||
BlockBasedTableOptions opts;
|
||||
opts.index_type = std::get<2>(GetParam());
|
||||
opts.no_block_cache = std::get<3>(GetParam());
|
||||
opts.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
||||
opts.partition_filters =
|
||||
opts.index_type ==
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
||||
options_.table_factory.reset(
|
||||
static_cast<BlockBasedTableFactory*>(NewBlockBasedTableFactory(opts)));
|
||||
options_.prefix_extractor =
|
||||
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(3));
|
||||
}
|
||||
|
||||
CompressionType compression_type_;
|
||||
bool use_direct_reads_;
|
||||
bool udt_enabled_;
|
||||
bool persist_udt_;
|
||||
uint32_t compression_parallel_threads_;
|
||||
uint32_t compression_dict_bytes_;
|
||||
};
|
||||
|
||||
// Tests MultiGet in both direct IO and non-direct IO mode.
|
||||
// The keys should be in cache after MultiGet.
|
||||
TEST_P(BlockBasedTableReaderTest, MultiGet) {
|
||||
Options options;
|
||||
ReadOptions read_opts;
|
||||
std::string dummy_ts(sizeof(uint64_t), '\0');
|
||||
Slice read_timestamp = dummy_ts;
|
||||
if (udt_enabled_) {
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
read_opts.timestamp = &read_timestamp;
|
||||
}
|
||||
options.persist_user_defined_timestamps = persist_udt_;
|
||||
size_t ts_sz = options.comparator->timestamp_size();
|
||||
std::map<std::string, std::string> kv =
|
||||
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
||||
100 /* num_block */,
|
||||
true /* mixed_with_human_readable_string_value */);
|
||||
true /* mixed_with_human_readable_string_value */, ts_sz);
|
||||
|
||||
// Prepare keys, values, and statuses for MultiGet.
|
||||
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys;
|
||||
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys_without_timestamps;
|
||||
autovector<PinnableSlice, MultiGetContext::MAX_BATCH_SIZE> values;
|
||||
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
|
||||
{
|
||||
|
@ -221,27 +279,40 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) {
|
|||
auto it = kv.begin();
|
||||
for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE; i++) {
|
||||
keys.emplace_back(it->first);
|
||||
if (ts_sz > 0) {
|
||||
Slice ukey_without_ts = StripTimestampFromUserKey(it->first, ts_sz);
|
||||
keys_without_timestamps.push_back(ukey_without_ts);
|
||||
} else {
|
||||
keys_without_timestamps.emplace_back(it->first);
|
||||
}
|
||||
values.emplace_back();
|
||||
statuses.emplace_back();
|
||||
std::advance(it, step);
|
||||
}
|
||||
}
|
||||
|
||||
std::string table_name =
|
||||
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
|
||||
CreateTable(table_name, compression_type_, kv);
|
||||
std::string table_name = "BlockBasedTableReaderTest_MultiGet" +
|
||||
CompressionTypeToString(compression_type_);
|
||||
|
||||
ImmutableOptions ioptions(options);
|
||||
CreateTable(table_name, ioptions, compression_type_, kv,
|
||||
compression_parallel_threads_, compression_dict_bytes_);
|
||||
|
||||
std::unique_ptr<BlockBasedTable> table;
|
||||
Options options;
|
||||
ImmutableOptions ioptions(options);
|
||||
FileOptions foptions;
|
||||
foptions.use_direct_reads = use_direct_reads_;
|
||||
InternalKeyComparator comparator(options.comparator);
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
||||
true /* bool prefetch_index_and_filter_in_cache */,
|
||||
nullptr /* status */, persist_udt_);
|
||||
|
||||
ASSERT_OK(
|
||||
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum));
|
||||
|
||||
// Ensure that keys are not in cache before MultiGet.
|
||||
for (auto& key : keys) {
|
||||
ASSERT_FALSE(table->TEST_KeyInCache(ReadOptions(), key));
|
||||
std::string ikey = ToInternalKey(key.ToString());
|
||||
ASSERT_FALSE(table->TEST_KeyInCache(read_opts, ikey));
|
||||
}
|
||||
|
||||
// Prepare MultiGetContext.
|
||||
|
@ -249,26 +320,26 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) {
|
|||
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
|
||||
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
|
||||
for (size_t i = 0; i < keys.size(); ++i) {
|
||||
get_context.emplace_back(BytewiseComparator(), nullptr, nullptr, nullptr,
|
||||
get_context.emplace_back(options.comparator, nullptr, nullptr, nullptr,
|
||||
GetContext::kNotFound, keys[i], &values[i],
|
||||
nullptr, nullptr, nullptr, nullptr,
|
||||
true /* do_merge */, nullptr, nullptr, nullptr,
|
||||
nullptr, nullptr, nullptr);
|
||||
key_context.emplace_back(nullptr, keys[i], &values[i], nullptr, nullptr,
|
||||
&statuses.back());
|
||||
key_context.emplace_back(nullptr, keys_without_timestamps[i], &values[i],
|
||||
nullptr, nullptr, &statuses.back());
|
||||
key_context.back().get_context = &get_context.back();
|
||||
}
|
||||
for (auto& key_ctx : key_context) {
|
||||
sorted_keys.emplace_back(&key_ctx);
|
||||
}
|
||||
MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, ReadOptions(),
|
||||
MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, read_opts,
|
||||
fs_.get(), nullptr);
|
||||
|
||||
// Execute MultiGet.
|
||||
MultiGetContext::Range range = ctx.GetMultiGetRange();
|
||||
PerfContext* perf_ctx = get_perf_context();
|
||||
perf_ctx->Reset();
|
||||
table->MultiGet(ReadOptions(), &range, nullptr);
|
||||
table->MultiGet(read_opts, &range, nullptr);
|
||||
|
||||
ASSERT_GE(perf_ctx->block_read_count - perf_ctx->index_block_read_count -
|
||||
perf_ctx->filter_block_read_count -
|
||||
|
@ -281,11 +352,78 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) {
|
|||
}
|
||||
// Check that keys are in cache after MultiGet.
|
||||
for (size_t i = 0; i < keys.size(); i++) {
|
||||
ASSERT_TRUE(table->TEST_KeyInCache(ReadOptions(), keys[i]));
|
||||
std::string ikey = ToInternalKey(keys[i].ToString());
|
||||
ASSERT_TRUE(table->TEST_KeyInCache(read_opts, ikey));
|
||||
ASSERT_EQ(values[i].ToString(), kv[keys[i].ToString()]);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_P(BlockBasedTableReaderTest, NewIterator) {
|
||||
Options options;
|
||||
ReadOptions read_opts;
|
||||
std::string dummy_ts(sizeof(uint64_t), '\0');
|
||||
Slice read_timestamp = dummy_ts;
|
||||
if (udt_enabled_) {
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
read_opts.timestamp = &read_timestamp;
|
||||
}
|
||||
options.persist_user_defined_timestamps = persist_udt_;
|
||||
size_t ts_sz = options.comparator->timestamp_size();
|
||||
std::map<std::string, std::string> kv =
|
||||
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
||||
100 /* num_block */,
|
||||
true /* mixed_with_human_readable_string_value */, ts_sz);
|
||||
|
||||
std::string table_name = "BlockBasedTableReaderTest_NewIterator" +
|
||||
CompressionTypeToString(compression_type_);
|
||||
|
||||
ImmutableOptions ioptions(options);
|
||||
CreateTable(table_name, ioptions, compression_type_, kv,
|
||||
compression_parallel_threads_, compression_dict_bytes_);
|
||||
|
||||
std::unique_ptr<BlockBasedTable> table;
|
||||
FileOptions foptions;
|
||||
foptions.use_direct_reads = use_direct_reads_;
|
||||
InternalKeyComparator comparator(options.comparator);
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
||||
true /* bool prefetch_index_and_filter_in_cache */,
|
||||
nullptr /* status */, persist_udt_);
|
||||
ASSERT_OK(
|
||||
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum));
|
||||
|
||||
std::unique_ptr<InternalIterator> iter;
|
||||
iter.reset(table->NewIterator(
|
||||
read_opts, options_.prefix_extractor.get(), /*arena=*/nullptr,
|
||||
/*skip_filters=*/false, TableReaderCaller::kUncategorized));
|
||||
|
||||
// Test forward scan.
|
||||
ASSERT_TRUE(!iter->Valid());
|
||||
iter->SeekToFirst();
|
||||
ASSERT_OK(iter->status());
|
||||
for (auto kv_iter = kv.begin(); kv_iter != kv.end(); kv_iter++) {
|
||||
std::string ikey = ToInternalKey(kv_iter->first);
|
||||
ASSERT_EQ(iter->key().ToString(), ikey);
|
||||
ASSERT_EQ(iter->value().ToString(), kv_iter->second);
|
||||
iter->Next();
|
||||
ASSERT_OK(iter->status());
|
||||
}
|
||||
ASSERT_TRUE(!iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
|
||||
// Test backward scan.
|
||||
iter->SeekToLast();
|
||||
ASSERT_OK(iter->status());
|
||||
for (auto kv_iter = kv.rbegin(); kv_iter != kv.rend(); kv_iter++) {
|
||||
std::string ikey = ToInternalKey(kv_iter->first);
|
||||
ASSERT_EQ(iter->key().ToString(), ikey);
|
||||
ASSERT_EQ(iter->value().ToString(), kv_iter->second);
|
||||
iter->Prev();
|
||||
ASSERT_OK(iter->status());
|
||||
}
|
||||
ASSERT_TRUE(!iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
}
|
||||
|
||||
class ChargeTableReaderTest
|
||||
: public BlockBasedTableReaderBaseTest,
|
||||
public testing::WithParamInterface<
|
||||
|
@ -374,7 +512,8 @@ class ChargeTableReaderTest
|
|||
std::size_t approx_table_reader_mem = 0;
|
||||
|
||||
std::string table_name = "table_for_approx_table_reader_mem";
|
||||
CreateTable(table_name, compression_type_, kv_);
|
||||
ImmutableOptions ioptions(options_);
|
||||
CreateTable(table_name, ioptions, compression_type_, kv_);
|
||||
|
||||
std::unique_ptr<BlockBasedTable> table;
|
||||
Status s;
|
||||
|
@ -424,13 +563,14 @@ TEST_P(ChargeTableReaderTest, Basic) {
|
|||
std::size_t opened_table_reader_num = 0;
|
||||
std::string table_name;
|
||||
std::vector<std::unique_ptr<BlockBasedTable>> tables;
|
||||
ImmutableOptions ioptions(options_);
|
||||
// Keep creating BlockBasedTableReader till hiting the memory limit based on
|
||||
// cache capacity and creation fails (when charge_table_reader_ ==
|
||||
// kEnabled) or reaching a specfied big number of table readers (when
|
||||
// charge_table_reader_ == kDisabled)
|
||||
while (s.ok() && opened_table_reader_num < max_table_reader_num_uncapped) {
|
||||
table_name = "table_" + std::to_string(opened_table_reader_num);
|
||||
CreateTable(table_name, compression_type_, kv_);
|
||||
CreateTable(table_name, ioptions, compression_type_, kv_);
|
||||
tables.push_back(std::unique_ptr<BlockBasedTable>());
|
||||
NewBlockBasedTableReader(
|
||||
FileOptions(), ImmutableOptions(options_),
|
||||
|
@ -465,7 +605,7 @@ TEST_P(ChargeTableReaderTest, Basic) {
|
|||
--opened_table_reader_num;
|
||||
}
|
||||
table_name = "table_for_successful_table_reader_open";
|
||||
CreateTable(table_name, compression_type_, kv_);
|
||||
CreateTable(table_name, ioptions, compression_type_, kv_);
|
||||
tables.push_back(std::unique_ptr<BlockBasedTable>());
|
||||
NewBlockBasedTableReader(
|
||||
FileOptions(), ImmutableOptions(options_),
|
||||
|
@ -491,28 +631,42 @@ class BlockBasedTableReaderTestVerifyChecksum
|
|||
};
|
||||
|
||||
TEST_P(BlockBasedTableReaderTestVerifyChecksum, ChecksumMismatch) {
|
||||
std::map<std::string, std::string> kv =
|
||||
BlockBasedTableReaderBaseTest::GenerateKVMap(800 /* num_block */);
|
||||
|
||||
std::string table_name =
|
||||
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
|
||||
CreateTable(table_name, compression_type_, kv);
|
||||
|
||||
std::unique_ptr<BlockBasedTable> table;
|
||||
Options options;
|
||||
ReadOptions read_opts;
|
||||
std::string dummy_ts(sizeof(uint64_t), '\0');
|
||||
Slice read_timestamp = dummy_ts;
|
||||
if (udt_enabled_) {
|
||||
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
read_opts.timestamp = &read_timestamp;
|
||||
}
|
||||
options.persist_user_defined_timestamps = persist_udt_;
|
||||
size_t ts_sz = options.comparator->timestamp_size();
|
||||
std::map<std::string, std::string> kv =
|
||||
BlockBasedTableReaderBaseTest::GenerateKVMap(
|
||||
800 /* num_block */,
|
||||
false /* mixed_with_human_readable_string_value=*/, ts_sz);
|
||||
|
||||
options.statistics = CreateDBStatistics();
|
||||
ImmutableOptions ioptions(options);
|
||||
std::string table_name =
|
||||
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);
|
||||
CreateTable(table_name, ioptions, compression_type_, kv,
|
||||
compression_parallel_threads_, compression_dict_bytes_);
|
||||
|
||||
std::unique_ptr<BlockBasedTable> table;
|
||||
FileOptions foptions;
|
||||
foptions.use_direct_reads = use_direct_reads_;
|
||||
InternalKeyComparator comparator(options.comparator);
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
||||
true /* bool prefetch_index_and_filter_in_cache */,
|
||||
nullptr /* status */, persist_udt_);
|
||||
|
||||
// Use the top level iterator to find the offset/size of the first
|
||||
// 2nd level index block and corrupt the block
|
||||
IndexBlockIter iiter_on_stack;
|
||||
BlockCacheLookupContext context{TableReaderCaller::kUserVerifyChecksum};
|
||||
InternalIteratorBase<IndexValue>* iiter = table->NewIndexIterator(
|
||||
ReadOptions(), /*disable_prefix_seek=*/false, &iiter_on_stack,
|
||||
read_opts, /*need_upper_bound_check=*/false, &iiter_on_stack,
|
||||
/*get_context=*/nullptr, &context);
|
||||
std::unique_ptr<InternalIteratorBase<IndexValue>> iiter_unique_ptr;
|
||||
if (iiter != &iiter_on_stack) {
|
||||
|
@ -529,11 +683,13 @@ TEST_P(BlockBasedTableReaderTestVerifyChecksum, ChecksumMismatch) {
|
|||
ASSERT_OK(test::CorruptFile(options.env, Path(table_name),
|
||||
static_cast<int>(handle.offset()), 128));
|
||||
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table);
|
||||
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
|
||||
true /* bool prefetch_index_and_filter_in_cache */,
|
||||
nullptr /* status */, persist_udt_);
|
||||
ASSERT_EQ(0,
|
||||
options.statistics->getTickerCount(BLOCK_CHECKSUM_MISMATCH_COUNT));
|
||||
Status s = table->VerifyChecksum(ReadOptions(),
|
||||
TableReaderCaller::kUserVerifyChecksum);
|
||||
Status s =
|
||||
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum);
|
||||
ASSERT_EQ(1,
|
||||
options.statistics->getTickerCount(BLOCK_CHECKSUM_MISMATCH_COUNT));
|
||||
ASSERT_EQ(s.code(), Status::kCorruption);
|
||||
|
@ -541,14 +697,25 @@ TEST_P(BlockBasedTableReaderTestVerifyChecksum, ChecksumMismatch) {
|
|||
|
||||
// Param 1: compression type
|
||||
// Param 2: whether to use direct reads
|
||||
// Param 3: Block Based Table Index type
|
||||
// Param 3: Block Based Table Index type, partitioned filters are also enabled
|
||||
// when index type is kTwoLevelIndexSearch
|
||||
// Param 4: BBTO no_block_cache option
|
||||
// Param 5: test mode for the user-defined timestamp feature
|
||||
// Param 6: number of parallel compression threads
|
||||
// Param 7: CompressionOptions.max_dict_bytes and
|
||||
// CompressionOptions.max_dict_buffer_bytes. This enable/disables
|
||||
// compression dictionary.
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
MultiGet, BlockBasedTableReaderTest,
|
||||
BlockBasedTableReaderTest, BlockBasedTableReaderTest,
|
||||
::testing::Combine(
|
||||
::testing::ValuesIn(GetSupportedCompressions()), ::testing::Bool(),
|
||||
::testing::Values(BlockBasedTableOptions::IndexType::kBinarySearch),
|
||||
::testing::Values(false)));
|
||||
::testing::Values(
|
||||
BlockBasedTableOptions::IndexType::kBinarySearch,
|
||||
BlockBasedTableOptions::IndexType::kHashSearch,
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch,
|
||||
BlockBasedTableOptions::IndexType::kBinarySearchWithFirstKey),
|
||||
::testing::Values(false), ::testing::ValuesIn(test::GetUDTTestModes()),
|
||||
::testing::Values(1, 2), ::testing::Values(0, 4096)));
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
VerifyChecksum, BlockBasedTableReaderTestVerifyChecksum,
|
||||
::testing::Combine(
|
||||
|
@ -556,7 +723,8 @@ INSTANTIATE_TEST_CASE_P(
|
|||
::testing::Values(false),
|
||||
::testing::Values(
|
||||
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch),
|
||||
::testing::Values(true)));
|
||||
::testing::Values(true), ::testing::ValuesIn(test::GetUDTTestModes()),
|
||||
::testing::Values(1, 2), ::testing::Values(0)));
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
|
|
|
@ -137,7 +137,7 @@ InternalIteratorBase<IndexValue>* HashIndexReader::NewIterator(
|
|||
rep->get_global_seqno(BlockType::kIndex), iter, kNullStats,
|
||||
total_order_seek, index_has_first_key(), index_key_includes_seq(),
|
||||
index_value_is_full(), false /* block_contents_pinned */,
|
||||
true /* user_defined_timestamps_persisted */, prefix_index_.get());
|
||||
user_defined_timestamps_persisted(), prefix_index_.get());
|
||||
|
||||
assert(it != nullptr);
|
||||
index_block.TransferTo(it);
|
||||
|
|
|
@ -29,14 +29,16 @@ IndexBuilder* IndexBuilder::CreateIndexBuilder(
|
|||
const InternalKeyComparator* comparator,
|
||||
const InternalKeySliceTransform* int_key_slice_transform,
|
||||
const bool use_value_delta_encoding,
|
||||
const BlockBasedTableOptions& table_opt) {
|
||||
const BlockBasedTableOptions& table_opt, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps) {
|
||||
IndexBuilder* result = nullptr;
|
||||
switch (index_type) {
|
||||
case BlockBasedTableOptions::kBinarySearch: {
|
||||
result = new ShortenedIndexBuilder(
|
||||
comparator, table_opt.index_block_restart_interval,
|
||||
table_opt.format_version, use_value_delta_encoding,
|
||||
table_opt.index_shortening, /* include_first_key */ false);
|
||||
table_opt.index_shortening, /* include_first_key */ false, ts_sz,
|
||||
persist_user_defined_timestamps);
|
||||
break;
|
||||
}
|
||||
case BlockBasedTableOptions::kHashSearch: {
|
||||
|
@ -46,19 +48,22 @@ IndexBuilder* IndexBuilder::CreateIndexBuilder(
|
|||
result = new HashIndexBuilder(
|
||||
comparator, int_key_slice_transform,
|
||||
table_opt.index_block_restart_interval, table_opt.format_version,
|
||||
use_value_delta_encoding, table_opt.index_shortening);
|
||||
use_value_delta_encoding, table_opt.index_shortening, ts_sz,
|
||||
persist_user_defined_timestamps);
|
||||
break;
|
||||
}
|
||||
case BlockBasedTableOptions::kTwoLevelIndexSearch: {
|
||||
result = PartitionedIndexBuilder::CreateIndexBuilder(
|
||||
comparator, use_value_delta_encoding, table_opt);
|
||||
comparator, use_value_delta_encoding, table_opt, ts_sz,
|
||||
persist_user_defined_timestamps);
|
||||
break;
|
||||
}
|
||||
case BlockBasedTableOptions::kBinarySearchWithFirstKey: {
|
||||
result = new ShortenedIndexBuilder(
|
||||
comparator, table_opt.index_block_restart_interval,
|
||||
table_opt.format_version, use_value_delta_encoding,
|
||||
table_opt.index_shortening, /* include_first_key */ true);
|
||||
table_opt.index_shortening, /* include_first_key */ true, ts_sz,
|
||||
persist_user_defined_timestamps);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
|
@ -106,22 +111,31 @@ void ShortenedIndexBuilder::FindShortInternalKeySuccessor(
|
|||
PartitionedIndexBuilder* PartitionedIndexBuilder::CreateIndexBuilder(
|
||||
const InternalKeyComparator* comparator,
|
||||
const bool use_value_delta_encoding,
|
||||
const BlockBasedTableOptions& table_opt) {
|
||||
const BlockBasedTableOptions& table_opt, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps) {
|
||||
return new PartitionedIndexBuilder(comparator, table_opt,
|
||||
use_value_delta_encoding);
|
||||
use_value_delta_encoding, ts_sz,
|
||||
persist_user_defined_timestamps);
|
||||
}
|
||||
|
||||
PartitionedIndexBuilder::PartitionedIndexBuilder(
|
||||
const InternalKeyComparator* comparator,
|
||||
const BlockBasedTableOptions& table_opt,
|
||||
const bool use_value_delta_encoding)
|
||||
: IndexBuilder(comparator),
|
||||
index_block_builder_(table_opt.index_block_restart_interval,
|
||||
true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding),
|
||||
index_block_builder_without_seq_(table_opt.index_block_restart_interval,
|
||||
true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding),
|
||||
const bool use_value_delta_encoding, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps)
|
||||
: IndexBuilder(comparator, ts_sz, persist_user_defined_timestamps),
|
||||
index_block_builder_(
|
||||
table_opt.index_block_restart_interval, true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding,
|
||||
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
|
||||
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
|
||||
persist_user_defined_timestamps, false /* is_user_key */),
|
||||
index_block_builder_without_seq_(
|
||||
table_opt.index_block_restart_interval, true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding,
|
||||
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
|
||||
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
|
||||
persist_user_defined_timestamps, true /* is_user_key */),
|
||||
sub_index_builder_(nullptr),
|
||||
table_opt_(table_opt),
|
||||
// We start by false. After each partition we revise the value based on
|
||||
|
@ -142,7 +156,8 @@ void PartitionedIndexBuilder::MakeNewSubIndexBuilder() {
|
|||
sub_index_builder_ = new ShortenedIndexBuilder(
|
||||
comparator_, table_opt_.index_block_restart_interval,
|
||||
table_opt_.format_version, use_value_delta_encoding_,
|
||||
table_opt_.index_shortening, /* include_first_key */ false);
|
||||
table_opt_.index_shortening, /* include_first_key */ false, ts_sz_,
|
||||
persist_user_defined_timestamps_);
|
||||
|
||||
// Set sub_index_builder_->seperator_is_key_plus_seq_ to true if
|
||||
// seperator_is_key_plus_seq_ is true (internal-key mode) (set to false by
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "db/dbformat.h"
|
||||
#include "rocksdb/comparator.h"
|
||||
#include "table/block_based/block_based_table_factory.h"
|
||||
#include "table/block_based/block_builder.h"
|
||||
|
@ -39,7 +40,8 @@ class IndexBuilder {
|
|||
const ROCKSDB_NAMESPACE::InternalKeyComparator* comparator,
|
||||
const InternalKeySliceTransform* int_key_slice_transform,
|
||||
const bool use_value_delta_encoding,
|
||||
const BlockBasedTableOptions& table_opt);
|
||||
const BlockBasedTableOptions& table_opt, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps);
|
||||
|
||||
// Index builder will construct a set of blocks which contain:
|
||||
// 1. One primary index block.
|
||||
|
@ -49,8 +51,11 @@ class IndexBuilder {
|
|||
Slice index_block_contents;
|
||||
std::unordered_map<std::string, Slice> meta_blocks;
|
||||
};
|
||||
explicit IndexBuilder(const InternalKeyComparator* comparator)
|
||||
: comparator_(comparator) {}
|
||||
explicit IndexBuilder(const InternalKeyComparator* comparator, size_t ts_sz,
|
||||
bool persist_user_defined_timestamps)
|
||||
: comparator_(comparator),
|
||||
ts_sz_(ts_sz),
|
||||
persist_user_defined_timestamps_(persist_user_defined_timestamps) {}
|
||||
|
||||
virtual ~IndexBuilder() {}
|
||||
|
||||
|
@ -105,6 +110,13 @@ class IndexBuilder {
|
|||
|
||||
protected:
|
||||
const InternalKeyComparator* comparator_;
|
||||
// Size of user-defined timestamp in bytes.
|
||||
size_t ts_sz_;
|
||||
// Whether user-defined timestamp in the user key should be persisted when
|
||||
// creating index block. If this flag is false, user-defined timestamp will
|
||||
// be stripped from user key for each index entry, and the
|
||||
// `first_internal_key` in `IndexValue` if it's included.
|
||||
bool persist_user_defined_timestamps_;
|
||||
// Set after ::Finish is called
|
||||
size_t index_size_ = 0;
|
||||
};
|
||||
|
@ -125,14 +137,21 @@ class ShortenedIndexBuilder : public IndexBuilder {
|
|||
const int index_block_restart_interval, const uint32_t format_version,
|
||||
const bool use_value_delta_encoding,
|
||||
BlockBasedTableOptions::IndexShorteningMode shortening_mode,
|
||||
bool include_first_key)
|
||||
: IndexBuilder(comparator),
|
||||
index_block_builder_(index_block_restart_interval,
|
||||
true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding),
|
||||
index_block_builder_without_seq_(index_block_restart_interval,
|
||||
true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding),
|
||||
bool include_first_key, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps)
|
||||
: IndexBuilder(comparator, ts_sz, persist_user_defined_timestamps),
|
||||
index_block_builder_(
|
||||
index_block_restart_interval, true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding,
|
||||
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
|
||||
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
|
||||
persist_user_defined_timestamps, false /* is_user_key */),
|
||||
index_block_builder_without_seq_(
|
||||
index_block_restart_interval, true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding,
|
||||
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
|
||||
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
|
||||
persist_user_defined_timestamps, true /* is_user_key */),
|
||||
use_value_delta_encoding_(use_value_delta_encoding),
|
||||
include_first_key_(include_first_key),
|
||||
shortening_mode_(shortening_mode) {
|
||||
|
@ -172,7 +191,19 @@ class ShortenedIndexBuilder : public IndexBuilder {
|
|||
auto sep = Slice(*last_key_in_current_block);
|
||||
|
||||
assert(!include_first_key_ || !current_block_first_internal_key_.empty());
|
||||
IndexValue entry(block_handle, current_block_first_internal_key_);
|
||||
// When UDT should not be persisted, the index block builders take care of
|
||||
// stripping UDT from the key, for the first internal key contained in the
|
||||
// IndexValue, we need to explicitly do the stripping here before passing
|
||||
// it to the block builders.
|
||||
std::string first_internal_key_buf;
|
||||
Slice first_internal_key = current_block_first_internal_key_;
|
||||
if (!current_block_first_internal_key_.empty() && ts_sz_ > 0 &&
|
||||
!persist_user_defined_timestamps_) {
|
||||
StripTimestampFromInternalKey(&first_internal_key_buf,
|
||||
current_block_first_internal_key_, ts_sz_);
|
||||
first_internal_key = first_internal_key_buf;
|
||||
}
|
||||
IndexValue entry(block_handle, first_internal_key);
|
||||
std::string encoded_entry;
|
||||
std::string delta_encoded_entry;
|
||||
entry.EncodeTo(&encoded_entry, include_first_key_, nullptr);
|
||||
|
@ -185,6 +216,16 @@ class ShortenedIndexBuilder : public IndexBuilder {
|
|||
}
|
||||
last_encoded_handle_ = block_handle;
|
||||
const Slice delta_encoded_entry_slice(delta_encoded_entry);
|
||||
|
||||
// TODO(yuzhangyu): fix this when "FindShortInternalKeySuccessor"
|
||||
// optimization is available.
|
||||
// Timestamp aware comparator currently doesn't provide override for
|
||||
// "FindShortInternalKeySuccessor" optimization. So the actual
|
||||
// last key in current block is used as the key for indexing the current
|
||||
// block. As a result, when UDTs should not be persisted, it's safe to strip
|
||||
// away the UDT from key in index block as data block does the same thing.
|
||||
// What are the implications if a "FindShortInternalKeySuccessor"
|
||||
// optimization is provided.
|
||||
index_block_builder_.Add(sep, encoded_entry, &delta_encoded_entry_slice);
|
||||
if (!seperator_is_key_plus_seq_) {
|
||||
index_block_builder_without_seq_.Add(ExtractUserKey(sep), encoded_entry,
|
||||
|
@ -270,11 +311,13 @@ class HashIndexBuilder : public IndexBuilder {
|
|||
const SliceTransform* hash_key_extractor,
|
||||
int index_block_restart_interval, int format_version,
|
||||
bool use_value_delta_encoding,
|
||||
BlockBasedTableOptions::IndexShorteningMode shortening_mode)
|
||||
: IndexBuilder(comparator),
|
||||
BlockBasedTableOptions::IndexShorteningMode shortening_mode, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps)
|
||||
: IndexBuilder(comparator, ts_sz, persist_user_defined_timestamps),
|
||||
primary_index_builder_(comparator, index_block_restart_interval,
|
||||
format_version, use_value_delta_encoding,
|
||||
shortening_mode, /* include_first_key */ false),
|
||||
shortening_mode, /* include_first_key */ false,
|
||||
ts_sz, persist_user_defined_timestamps),
|
||||
hash_key_extractor_(hash_key_extractor) {}
|
||||
|
||||
virtual void AddIndexEntry(std::string* last_key_in_current_block,
|
||||
|
@ -379,11 +422,14 @@ class PartitionedIndexBuilder : public IndexBuilder {
|
|||
static PartitionedIndexBuilder* CreateIndexBuilder(
|
||||
const ROCKSDB_NAMESPACE::InternalKeyComparator* comparator,
|
||||
const bool use_value_delta_encoding,
|
||||
const BlockBasedTableOptions& table_opt);
|
||||
const BlockBasedTableOptions& table_opt, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps);
|
||||
|
||||
explicit PartitionedIndexBuilder(const InternalKeyComparator* comparator,
|
||||
const BlockBasedTableOptions& table_opt,
|
||||
const bool use_value_delta_encoding);
|
||||
const bool use_value_delta_encoding,
|
||||
size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps);
|
||||
|
||||
virtual ~PartitionedIndexBuilder();
|
||||
|
||||
|
|
|
@ -65,6 +65,12 @@ class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader {
|
|||
return table_->get_rep()->table_options.cache_index_and_filter_blocks;
|
||||
}
|
||||
|
||||
bool user_defined_timestamps_persisted() const {
|
||||
assert(table_ != nullptr);
|
||||
assert(table_->get_rep() != nullptr);
|
||||
return table_->get_rep()->user_defined_timestamps_persisted;
|
||||
}
|
||||
|
||||
Status GetOrReadIndexBlock(bool no_io, GetContext* get_context,
|
||||
BlockCacheLookupContext* lookup_context,
|
||||
CachableEntry<Block>* index_block,
|
||||
|
|
|
@ -26,15 +26,22 @@ PartitionedFilterBlockBuilder::PartitionedFilterBlockBuilder(
|
|||
FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval,
|
||||
const bool use_value_delta_encoding,
|
||||
PartitionedIndexBuilder* const p_index_builder,
|
||||
const uint32_t partition_size)
|
||||
const uint32_t partition_size, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps)
|
||||
: FullFilterBlockBuilder(_prefix_extractor, whole_key_filtering,
|
||||
filter_bits_builder),
|
||||
index_on_filter_block_builder_(index_block_restart_interval,
|
||||
true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding),
|
||||
index_on_filter_block_builder_without_seq_(index_block_restart_interval,
|
||||
true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding),
|
||||
index_on_filter_block_builder_(
|
||||
index_block_restart_interval, true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding,
|
||||
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
|
||||
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
|
||||
persist_user_defined_timestamps, false /* is_user_key */),
|
||||
index_on_filter_block_builder_without_seq_(
|
||||
index_block_restart_interval, true /*use_delta_encoding*/,
|
||||
use_value_delta_encoding,
|
||||
BlockBasedTableOptions::kDataBlockBinarySearch /* index_type */,
|
||||
0.75 /* data_block_hash_table_util_ratio */, ts_sz,
|
||||
persist_user_defined_timestamps, true /* is_user_key */),
|
||||
p_index_builder_(p_index_builder),
|
||||
keys_added_to_partition_(0),
|
||||
total_added_in_built_(0) {
|
||||
|
@ -270,7 +277,8 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
|
|||
table()->get_rep()->get_global_seqno(BlockType::kFilterPartitionIndex),
|
||||
&iter, kNullStats, true /* total_order_seek */,
|
||||
false /* have_first_key */, index_key_includes_seq(),
|
||||
index_value_is_full());
|
||||
index_value_is_full(), false /* block_contents_pinned */,
|
||||
user_defined_timestamps_persisted());
|
||||
iter.Seek(entry);
|
||||
if (UNLIKELY(!iter.Valid())) {
|
||||
// entry is larger than all the keys. However its prefix might still be
|
||||
|
@ -470,7 +478,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(
|
|||
comparator->user_comparator(),
|
||||
rep->get_global_seqno(BlockType::kFilterPartitionIndex), &biter,
|
||||
kNullStats, true /* total_order_seek */, false /* have_first_key */,
|
||||
index_key_includes_seq(), index_value_is_full());
|
||||
index_key_includes_seq(), index_value_is_full(),
|
||||
false /* block_contents_pinned */, user_defined_timestamps_persisted());
|
||||
// Index partitions are assumed to be consecuitive. Prefetch them all.
|
||||
// Read the first block offset
|
||||
biter.SeekToFirst();
|
||||
|
@ -550,4 +559,10 @@ bool PartitionedFilterBlockReader::index_value_is_full() const {
|
|||
return table()->get_rep()->index_value_is_full;
|
||||
}
|
||||
|
||||
bool PartitionedFilterBlockReader::user_defined_timestamps_persisted() const {
|
||||
assert(table());
|
||||
assert(table()->get_rep());
|
||||
|
||||
return table()->get_rep()->user_defined_timestamps_persisted;
|
||||
}
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
|
|
@ -31,7 +31,8 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder {
|
|||
FilterBitsBuilder* filter_bits_builder, int index_block_restart_interval,
|
||||
const bool use_value_delta_encoding,
|
||||
PartitionedIndexBuilder* const p_index_builder,
|
||||
const uint32_t partition_size);
|
||||
const uint32_t partition_size, size_t ts_sz,
|
||||
const bool persist_user_defined_timestamps);
|
||||
|
||||
virtual ~PartitionedFilterBlockBuilder();
|
||||
|
||||
|
@ -172,6 +173,7 @@ class PartitionedFilterBlockReader
|
|||
const InternalKeyComparator* internal_comparator() const;
|
||||
bool index_key_includes_seq() const;
|
||||
bool index_value_is_full() const;
|
||||
bool user_defined_timestamps_persisted() const;
|
||||
|
||||
protected:
|
||||
// For partition blocks pinned in cache. Can be a subset of blocks
|
||||
|
|
|
@ -57,7 +57,8 @@ class MyPartitionedFilterBlockReader : public PartitionedFilterBlockReader {
|
|||
|
||||
class PartitionedFilterBlockTest
|
||||
: public testing::Test,
|
||||
virtual public ::testing::WithParamInterface<uint32_t> {
|
||||
virtual public ::testing::WithParamInterface<
|
||||
std::tuple<uint32_t, test::UserDefinedTimestampTestMode>> {
|
||||
public:
|
||||
Options options_;
|
||||
ImmutableOptions ioptions_;
|
||||
|
@ -67,38 +68,64 @@ class PartitionedFilterBlockTest
|
|||
std::unique_ptr<BlockBasedTable> table_;
|
||||
std::shared_ptr<Cache> cache_;
|
||||
int bits_per_key_;
|
||||
size_t ts_sz_;
|
||||
bool user_defined_timestamps_persisted_;
|
||||
|
||||
PartitionedFilterBlockTest()
|
||||
: ioptions_(options_),
|
||||
env_options_(options_),
|
||||
icomp_(options_.comparator),
|
||||
bits_per_key_(10) {
|
||||
PartitionedFilterBlockTest() : bits_per_key_(10) {
|
||||
auto udt_test_mode = std::get<1>(GetParam());
|
||||
if (test::IsUDTEnabled(udt_test_mode)) {
|
||||
options_.comparator = test::BytewiseComparatorWithU64TsWrapper();
|
||||
}
|
||||
ts_sz_ = options_.comparator->timestamp_size();
|
||||
user_defined_timestamps_persisted_ = test::ShouldPersistUDT(udt_test_mode);
|
||||
icomp_ = InternalKeyComparator(options_.comparator);
|
||||
env_options_ = EnvOptions(options_);
|
||||
ioptions_ = ImmutableOptions(options_);
|
||||
table_options_.filter_policy.reset(
|
||||
NewBloomFilterPolicy(bits_per_key_, false));
|
||||
table_options_.format_version = GetParam();
|
||||
table_options_.format_version = std::get<0>(GetParam());
|
||||
table_options_.index_block_restart_interval = 3;
|
||||
}
|
||||
|
||||
~PartitionedFilterBlockTest() override {}
|
||||
|
||||
const std::string keys[4] = {"afoo", "bar", "box", "hello"};
|
||||
const std::string missing_keys[2] = {"missing", "other"};
|
||||
static constexpr int kKeyNum = 4;
|
||||
static constexpr int kMissingKeyNum = 2;
|
||||
const std::string keys_without_ts[kKeyNum] = {"afoo", "bar", "box", "hello"};
|
||||
const std::string missing_keys_without_ts[kMissingKeyNum] = {"missing",
|
||||
"other"};
|
||||
|
||||
std::vector<std::string> PrepareKeys(const std::string* orig_keys,
|
||||
int number_of_keys) {
|
||||
std::vector<std::string> user_keys;
|
||||
if (ts_sz_ == 0) {
|
||||
user_keys.assign(orig_keys, orig_keys + number_of_keys);
|
||||
} else {
|
||||
for (int i = 0; i < number_of_keys; i++) {
|
||||
std::string key_with_ts;
|
||||
AppendKeyWithMinTimestamp(&key_with_ts, orig_keys[i], ts_sz_);
|
||||
user_keys.push_back(std::move(key_with_ts));
|
||||
}
|
||||
}
|
||||
return user_keys;
|
||||
}
|
||||
|
||||
uint64_t MaxIndexSize() {
|
||||
int num_keys = sizeof(keys) / sizeof(*keys);
|
||||
uint64_t max_key_size = 0;
|
||||
for (int i = 1; i < num_keys; i++) {
|
||||
max_key_size =
|
||||
std::max(max_key_size, static_cast<uint64_t>(keys[i].size()));
|
||||
for (int i = 0; i < kKeyNum; i++) {
|
||||
// If UDT is enabled, the size of each key would be increased by a
|
||||
// timestamp size.
|
||||
max_key_size = std::max(
|
||||
max_key_size, static_cast<uint64_t>(keys_without_ts[i].size()) +
|
||||
ts_sz_ * sizeof(static_cast<unsigned char>(0)));
|
||||
}
|
||||
uint64_t max_index_size = num_keys * (max_key_size + 8 /*handle*/);
|
||||
uint64_t max_index_size = kKeyNum * (max_key_size + 8 /*handle*/);
|
||||
return max_index_size;
|
||||
}
|
||||
|
||||
uint64_t MaxFilterSize() {
|
||||
int num_keys = sizeof(keys) / sizeof(*keys);
|
||||
// General, rough over-approximation
|
||||
return num_keys * bits_per_key_ + (CACHE_LINE_SIZE * 8 + /*metadata*/ 5);
|
||||
return kKeyNum * bits_per_key_ + (CACHE_LINE_SIZE * 8 + /*metadata*/ 5);
|
||||
}
|
||||
|
||||
uint64_t last_offset = 10;
|
||||
|
@ -112,7 +139,8 @@ class PartitionedFilterBlockTest
|
|||
PartitionedIndexBuilder* NewIndexBuilder() {
|
||||
const bool kValueDeltaEncoded = true;
|
||||
return PartitionedIndexBuilder::CreateIndexBuilder(
|
||||
&icomp_, !kValueDeltaEncoded, table_options_);
|
||||
&icomp_, !kValueDeltaEncoded, table_options_, ts_sz_,
|
||||
user_defined_timestamps_persisted_);
|
||||
}
|
||||
|
||||
PartitionedFilterBlockBuilder* NewBuilder(
|
||||
|
@ -131,7 +159,8 @@ class PartitionedFilterBlockTest
|
|||
BloomFilterPolicy::GetBuilderFromContext(
|
||||
FilterBuildingContext(table_options_)),
|
||||
table_options_.index_block_restart_interval, !kValueDeltaEncoded,
|
||||
p_index_builder, partition_size);
|
||||
p_index_builder, partition_size, ts_sz_,
|
||||
user_defined_timestamps_persisted_);
|
||||
}
|
||||
|
||||
PartitionedFilterBlockReader* NewReader(
|
||||
|
@ -152,7 +181,8 @@ class PartitionedFilterBlockTest
|
|||
table_.reset(new MockedBlockBasedTable(
|
||||
new BlockBasedTable::Rep(ioptions_, env_options_, table_options_,
|
||||
icomp_, skip_filters, file_size, level,
|
||||
immortal_table),
|
||||
immortal_table,
|
||||
user_defined_timestamps_persisted_),
|
||||
pib));
|
||||
BlockContents contents(slice);
|
||||
CachableEntry<Block> block(
|
||||
|
@ -169,38 +199,41 @@ class PartitionedFilterBlockTest
|
|||
NewReader(builder, pib));
|
||||
// Querying added keys
|
||||
const bool no_io = true;
|
||||
std::vector<std::string> keys = PrepareKeys(keys_without_ts, kKeyNum);
|
||||
for (auto key : keys) {
|
||||
auto ikey = InternalKey(key, 0, ValueType::kTypeValue);
|
||||
const Slice ikey_slice = Slice(*ikey.rep());
|
||||
ASSERT_TRUE(reader->KeyMayMatch(key, !no_io, &ikey_slice,
|
||||
/*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr,
|
||||
ReadOptions()));
|
||||
ASSERT_TRUE(reader->KeyMayMatch(
|
||||
StripTimestampFromUserKey(key, ts_sz_), !no_io, &ikey_slice,
|
||||
/*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr, ReadOptions()));
|
||||
}
|
||||
{
|
||||
// querying a key twice
|
||||
auto ikey = InternalKey(keys[0], 0, ValueType::kTypeValue);
|
||||
const Slice ikey_slice = Slice(*ikey.rep());
|
||||
ASSERT_TRUE(reader->KeyMayMatch(keys[0], !no_io, &ikey_slice,
|
||||
/*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr,
|
||||
ReadOptions()));
|
||||
ASSERT_TRUE(reader->KeyMayMatch(
|
||||
StripTimestampFromUserKey(keys[0], ts_sz_), !no_io, &ikey_slice,
|
||||
/*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr, ReadOptions()));
|
||||
}
|
||||
// querying missing keys
|
||||
std::vector<std::string> missing_keys =
|
||||
PrepareKeys(missing_keys_without_ts, kMissingKeyNum);
|
||||
for (auto key : missing_keys) {
|
||||
auto ikey = InternalKey(key, 0, ValueType::kTypeValue);
|
||||
const Slice ikey_slice = Slice(*ikey.rep());
|
||||
if (empty) {
|
||||
ASSERT_TRUE(reader->KeyMayMatch(key, !no_io, &ikey_slice,
|
||||
/*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr,
|
||||
ReadOptions()));
|
||||
ASSERT_TRUE(reader->KeyMayMatch(
|
||||
StripTimestampFromUserKey(key, ts_sz_), !no_io, &ikey_slice,
|
||||
/*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr, ReadOptions()));
|
||||
} else {
|
||||
// assuming a good hash function
|
||||
ASSERT_FALSE(reader->KeyMayMatch(key, !no_io, &ikey_slice,
|
||||
/*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr,
|
||||
ReadOptions()));
|
||||
ASSERT_FALSE(reader->KeyMayMatch(
|
||||
StripTimestampFromUserKey(key, ts_sz_), !no_io, &ikey_slice,
|
||||
/*get_context=*/nullptr,
|
||||
/*lookup_context=*/nullptr, ReadOptions()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -210,17 +243,18 @@ class PartitionedFilterBlockTest
|
|||
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
|
||||
NewBuilder(pib.get()));
|
||||
int i = 0;
|
||||
builder->Add(keys[i]);
|
||||
std::vector<std::string> keys = PrepareKeys(keys_without_ts, kKeyNum);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
CutABlock(pib.get(), keys[i], keys[i + 1]);
|
||||
i++;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
CutABlock(pib.get(), keys[i], keys[i + 1]);
|
||||
i++;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
CutABlock(pib.get(), keys[i], keys[i + 1]);
|
||||
i++;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
CutABlock(pib.get(), keys[i]);
|
||||
|
||||
VerifyReader(builder.get(), pib.get());
|
||||
|
@ -231,16 +265,17 @@ class PartitionedFilterBlockTest
|
|||
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
|
||||
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
|
||||
NewBuilder(pib.get(), prefix_extractor));
|
||||
std::vector<std::string> keys = PrepareKeys(keys_without_ts, kKeyNum);
|
||||
int i = 0;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
i++;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
CutABlock(pib.get(), keys[i], keys[i + 1]);
|
||||
i++;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
i++;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
CutABlock(pib.get(), keys[i]);
|
||||
|
||||
VerifyReader(builder.get(), pib.get(), prefix_extractor);
|
||||
|
@ -250,15 +285,16 @@ class PartitionedFilterBlockTest
|
|||
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
|
||||
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
|
||||
NewBuilder(pib.get()));
|
||||
std::vector<std::string> keys = PrepareKeys(keys_without_ts, kKeyNum);
|
||||
int i = 0;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
i++;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
i++;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
i++;
|
||||
builder->Add(keys[i]);
|
||||
builder->Add(StripTimestampFromUserKey(keys[i], ts_sz_));
|
||||
CutABlock(pib.get(), keys[i]);
|
||||
|
||||
VerifyReader(builder.get(), pib.get());
|
||||
|
@ -299,10 +335,12 @@ class PartitionedFilterBlockTest
|
|||
};
|
||||
|
||||
// Format versions potentially intersting to partitioning
|
||||
INSTANTIATE_TEST_CASE_P(FormatVersions, PartitionedFilterBlockTest,
|
||||
testing::ValuesIn(std::set<uint32_t>{
|
||||
2, 3, 4, test::kDefaultFormatVersion,
|
||||
kLatestFormatVersion}));
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
FormatVersions, PartitionedFilterBlockTest,
|
||||
testing::Combine(testing::ValuesIn(std::set<uint32_t>{
|
||||
2, 3, 4, test::kDefaultFormatVersion,
|
||||
kLatestFormatVersion}),
|
||||
testing::ValuesIn(test::GetUDTTestModes())));
|
||||
|
||||
TEST_P(PartitionedFilterBlockTest, EmptyBuilder) {
|
||||
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
|
||||
|
@ -337,12 +375,14 @@ TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
|
|||
std::unique_ptr<PartitionedIndexBuilder> pib(NewIndexBuilder());
|
||||
std::unique_ptr<PartitionedFilterBlockBuilder> builder(
|
||||
NewBuilder(pib.get(), prefix_extractor.get()));
|
||||
const std::string pkeys[3] = {"p-key10", "p-key20", "p-key30"};
|
||||
builder->Add(pkeys[0]);
|
||||
const std::string pkeys_without_ts[3] = {"p-key10", "p-key20", "p-key30"};
|
||||
std::vector<std::string> pkeys =
|
||||
PrepareKeys(pkeys_without_ts, 3 /* number_of_keys */);
|
||||
builder->Add(StripTimestampFromUserKey(pkeys[0], ts_sz_));
|
||||
CutABlock(pib.get(), pkeys[0], pkeys[1]);
|
||||
builder->Add(pkeys[1]);
|
||||
builder->Add(StripTimestampFromUserKey(pkeys[1], ts_sz_));
|
||||
CutABlock(pib.get(), pkeys[1], pkeys[2]);
|
||||
builder->Add(pkeys[2]);
|
||||
builder->Add(StripTimestampFromUserKey(pkeys[2], ts_sz_));
|
||||
CutABlock(pib.get(), pkeys[2]);
|
||||
std::unique_ptr<PartitionedFilterBlockReader> reader(
|
||||
NewReader(builder.get(), pib.get()));
|
||||
|
@ -356,7 +396,10 @@ TEST_P(PartitionedFilterBlockTest, SamePrefixInMultipleBlocks) {
|
|||
ReadOptions()));
|
||||
}
|
||||
// Non-existent keys but with the same prefix
|
||||
const std::string pnonkeys[4] = {"p-key9", "p-key11", "p-key21", "p-key31"};
|
||||
const std::string pnonkeys_without_ts[4] = {"p-key9", "p-key11", "p-key21",
|
||||
"p-key31"};
|
||||
std::vector<std::string> pnonkeys =
|
||||
PrepareKeys(pnonkeys_without_ts, 4 /* number_of_keys */);
|
||||
for (auto key : pnonkeys) {
|
||||
auto ikey = InternalKey(key, 0, ValueType::kTypeValue);
|
||||
const Slice ikey_slice = Slice(*ikey.rep());
|
||||
|
@ -381,23 +424,25 @@ TEST_P(PartitionedFilterBlockTest, PrefixInWrongPartitionBug) {
|
|||
// In the bug, searching for prefix "p3" on an index with format version 3,
|
||||
// will give the key "p3" and the partition of the keys that are <= p3, i.e.,
|
||||
// p2-keys, where the filter for prefix "p3" does not exist.
|
||||
const std::string pkeys[] = {"p1-key1", "p2-key2", "p3-key3", "p4-key3",
|
||||
"p5-key3"};
|
||||
builder->Add(pkeys[0]);
|
||||
const std::string pkeys_without_ts[] = {"p1-key1", "p2-key2", "p3-key3",
|
||||
"p4-key3", "p5-key3"};
|
||||
std::vector<std::string> pkeys =
|
||||
PrepareKeys(pkeys_without_ts, 5 /* number_of_keys */);
|
||||
builder->Add(StripTimestampFromUserKey(pkeys[0], ts_sz_));
|
||||
CutABlock(pib.get(), pkeys[0], pkeys[1]);
|
||||
builder->Add(pkeys[1]);
|
||||
builder->Add(StripTimestampFromUserKey(pkeys[1], ts_sz_));
|
||||
CutABlock(pib.get(), pkeys[1], pkeys[2]);
|
||||
builder->Add(pkeys[2]);
|
||||
builder->Add(StripTimestampFromUserKey(pkeys[2], ts_sz_));
|
||||
CutABlock(pib.get(), pkeys[2], pkeys[3]);
|
||||
builder->Add(pkeys[3]);
|
||||
builder->Add(StripTimestampFromUserKey(pkeys[3], ts_sz_));
|
||||
CutABlock(pib.get(), pkeys[3], pkeys[4]);
|
||||
builder->Add(pkeys[4]);
|
||||
builder->Add(StripTimestampFromUserKey(pkeys[4], ts_sz_));
|
||||
CutABlock(pib.get(), pkeys[4]);
|
||||
std::unique_ptr<PartitionedFilterBlockReader> reader(
|
||||
NewReader(builder.get(), pib.get()));
|
||||
for (auto key : pkeys) {
|
||||
auto prefix = prefix_extractor->Transform(key);
|
||||
auto ikey = InternalKey(prefix, 0, ValueType::kTypeValue);
|
||||
auto ikey = InternalKey(key, 0, ValueType::kTypeValue);
|
||||
const Slice ikey_slice = Slice(*ikey.rep());
|
||||
ASSERT_TRUE(reader->PrefixMayMatch(prefix,
|
||||
/*no_io=*/false, &ikey_slice,
|
||||
|
@ -416,7 +461,6 @@ TEST_P(PartitionedFilterBlockTest, OneBlockPerKey) {
|
|||
}
|
||||
|
||||
TEST_P(PartitionedFilterBlockTest, PartitionCount) {
|
||||
int num_keys = sizeof(keys) / sizeof(*keys);
|
||||
table_options_.metadata_block_size =
|
||||
std::max(MaxIndexSize(), MaxFilterSize());
|
||||
int partitions = TestBlockPerKey();
|
||||
|
@ -424,7 +468,7 @@ TEST_P(PartitionedFilterBlockTest, PartitionCount) {
|
|||
// A low number ensures cutting a block after each key
|
||||
table_options_.metadata_block_size = 1;
|
||||
partitions = TestBlockPerKey();
|
||||
ASSERT_EQ(partitions, num_keys - 1 /* last two keys make one flush */);
|
||||
ASSERT_EQ(partitions, kKeyNum - 1 /* last two keys make one flush */);
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -433,4 +477,4 @@ int main(int argc, char** argv) {
|
|||
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
}
|
|
@ -75,7 +75,8 @@ InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
|
|||
internal_comparator()->user_comparator(),
|
||||
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
|
||||
index_has_first_key(), index_key_includes_seq(),
|
||||
index_value_is_full()));
|
||||
index_value_is_full(), false /* block_contents_pinned */,
|
||||
user_defined_timestamps_persisted()));
|
||||
} else {
|
||||
ReadOptions ro;
|
||||
ro.fill_cache = read_options.fill_cache;
|
||||
|
@ -94,7 +95,8 @@ InternalIteratorBase<IndexValue>* PartitionIndexReader::NewIterator(
|
|||
internal_comparator()->user_comparator(),
|
||||
rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true,
|
||||
index_has_first_key(), index_key_includes_seq(),
|
||||
index_value_is_full()));
|
||||
index_value_is_full(), false /* block_contents_pinned */,
|
||||
user_defined_timestamps_persisted()));
|
||||
|
||||
it = new PartitionedIndexIterator(
|
||||
table(), ro, *internal_comparator(), std::move(index_iter),
|
||||
|
@ -140,7 +142,8 @@ Status PartitionIndexReader::CacheDependencies(
|
|||
index_block.GetValue()->NewIndexIterator(
|
||||
internal_comparator()->user_comparator(),
|
||||
rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true,
|
||||
index_has_first_key(), index_key_includes_seq(), index_value_is_full());
|
||||
index_has_first_key(), index_key_includes_seq(), index_value_is_full(),
|
||||
false /* block_contents_pinned */, user_defined_timestamps_persisted());
|
||||
// Index partitions are assumed to be consecuitive. Prefetch them all.
|
||||
// Read the first block offset
|
||||
biter.SeekToFirst();
|
||||
|
|
|
@ -43,7 +43,7 @@ struct TableReaderOptions {
|
|||
size_t _max_file_size_for_l0_meta_pin = 0,
|
||||
const std::string& _cur_db_session_id = "", uint64_t _cur_file_num = 0,
|
||||
UniqueId64x2 _unique_id = {}, SequenceNumber _largest_seqno = 0,
|
||||
uint64_t _tail_size = 0)
|
||||
uint64_t _tail_size = 0, bool _user_defined_timestamps_persisted = true)
|
||||
: ioptions(_ioptions),
|
||||
prefix_extractor(_prefix_extractor),
|
||||
env_options(_env_options),
|
||||
|
@ -59,7 +59,8 @@ struct TableReaderOptions {
|
|||
cur_file_num(_cur_file_num),
|
||||
unique_id(_unique_id),
|
||||
block_protection_bytes_per_key(_block_protection_bytes_per_key),
|
||||
tail_size(_tail_size) {}
|
||||
tail_size(_tail_size),
|
||||
user_defined_timestamps_persisted(_user_defined_timestamps_persisted) {}
|
||||
|
||||
const ImmutableOptions& ioptions;
|
||||
const std::shared_ptr<const SliceTransform>& prefix_extractor;
|
||||
|
@ -93,6 +94,9 @@ struct TableReaderOptions {
|
|||
uint8_t block_protection_bytes_per_key;
|
||||
|
||||
uint64_t tail_size;
|
||||
|
||||
// Whether the key in the table contains user-defined timestamps.
|
||||
bool user_defined_timestamps_persisted;
|
||||
};
|
||||
|
||||
struct TableBuilderOptions {
|
||||
|
|
Loading…
Reference in New Issue