Trim readahead size based on prefix during prefix scan (#13040)

Summary:
**Context/Summary:**
During prefix scan, prefetched data blocks containing keys not in the same prefix as the `Seek()`'s key will be wasted when `ReadOptions::prefix_same_as_start = true` since they won't be returned to the user. This PR is to exclude those data blocks from being prefetched in a similar manner like trimming according to `ReadOptions::iterate_upper_bound`.

Bonus: refactoring to some existing prefetch test so they are easier to extend and read

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13040

Test Plan:
- New UT, integration to existing UTs
- Benchmark to ensure no regression from CPU due to more trimming logic
```
// Build DB with one sorted run under the same prefix
./db_bench --benchmarks=fillrandom --prefix_size=3 --keys_per_prefix=5000000 --num=5000000 --db=/dev/shm/db_bench --disable_auto_compactions=1
```
```
// Augment the existing db bench to call `Seek()` instead of `SeekToFirst()` in `void ReadSequential(){..}` to trigger the logic in this PR

+++ b/tools/db_bench_tool.cc
@@ -5900,7 +5900,12 @@ class Benchmark {
     Iterator* iter = db->NewIterator(options);
     int64_t i = 0;
     int64_t bytes = 0;
-    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
+
+    iter->SeekToFirst();
+    assert(iter->status().ok() && iter->Valid());
+    auto prefix = prefix_extractor_->Transform(iter->key());
+
+    for (iter->Seek(prefix); i < reads_ && iter->Valid(); iter->Next()) {
       bytes += iter->key().size() + iter->value().size();
       thread->stats.FinishedOps(nullptr, db, 1, kRead);
       ++i;
:
```
```
// Compare prefix scan performance
./db_bench --benchmarks=readseq[-X20] --prefix_size=3  --prefix_same_as_start=1 --auto_readahead_size=1 --cache_size=1 --use_existing_db=1 --db=/dev/shm/db_bench --disable_auto_compactions=1

// Before PR
readseq [AVG    20 runs] : 2449011 (± 50238) ops/sec;  270.9 (± 5.6) MB/sec
readseq [MEDIAN 20 runs] : 2499167 ops/sec;  276.5 MB/sec

// After PR  (regress 0.4 %)
readseq [AVG    20 runs] : 2439098 (± 42931) ops/sec;  269.8 (± 4.7) MB/sec
readseq [MEDIAN 20 runs] : 2460859 ops/sec;  272.2 MB/sec

```

- Stress test: randomly set `prefix_same_as_start` in `TestPrefixScan()`. Run below for a while
```
python3 tools/db_crashtest.py --simple blackbox --prefix_size=5 --prefixpercent=65 --WAL_size_limit_MB=1 --WAL_ttl_seconds=0 --acquire_snapshot_one_in=10000 --adm_policy=1 --advise_random_on_open=1 --allow_data_in_errors=True --allow_fallocate=0 --async_io=0 --avoid_flush_during_recovery=1 --avoid_flush_during_shutdown=1 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=1000 --batch_protection_bytes_per_key=8 --bgerror_resume_retry_interval=100 --block_align=1 --block_protection_bytes_per_key=4 --block_size=16384 --bloom_before_level=-1 --bloom_bits=3 --bottommost_compression_type=none --bottommost_file_compaction_delay=3600 --bytes_per_sync=262144 --cache_index_and_filter_blocks=0 --cache_index_and_filter_blocks_with_high_priority=0 --cache_size=33554432 --cache_type=lru_cache --charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=0 --charge_table_reader=0 --check_multiget_consistency=0 --check_multiget_entity_consistency=0 --checkpoint_one_in=10000 --checksum_type=kxxHash --clear_column_family_one_in=0 --compact_files_one_in=1000 --compact_range_one_in=1000 --compaction_pri=4 --compaction_readahead_size=0 --compaction_style=2 --compaction_ttl=0 --compress_format_version=2 --compressed_secondary_cache_size=16777216 --compression_checksum=0 --compression_max_dict_buffer_bytes=0 --compression_max_dict_bytes=0 --compression_parallel_threads=1 --compression_type=none --compression_use_zstd_dict_trainer=1 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --daily_offpeak_time_utc= --data_block_index_type=0  --db_write_buffer_size=8388608 --decouple_partitioned_filters=1 --default_temperature=kCold --default_write_temperature=kWarm --delete_obsolete_files_period_micros=21600000000 --delpercent=4 --delrangepercent=1 --destroy_db_initially=1 --detect_filter_construct_corruption=0 --disable_file_deletions_one_in=10000 --disable_manual_compaction_one_in=1000000 --disable_wal=0 --dump_malloc_stats=1 --enable_checksum_handoff=0 --enable_compaction_filter=0 --enable_custom_split_merge=1 --enable_do_not_compress_roles=1 --enable_index_compression=1 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=1 --enable_sst_partitioner_factory=0 --enable_thread_tracking=1 --enable_write_thread_adaptive_yield=0 --error_recovery_with_no_fault_injection=0 --exclude_wal_from_write_fault_injection=1 --fail_if_options_file_error=0 --fifo_allow_compaction=1 --file_checksum_impl=big --fill_cache=0 --flush_one_in=1000000 --format_version=6 --get_all_column_family_metadata_one_in=10000 --get_current_wal_file_one_in=0 --get_live_files_apis_one_in=10000 --get_properties_of_all_tables_one_in=1000000 --get_property_one_in=1000000 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=274877906944 --high_pri_pool_ratio=0 --index_block_restart_interval=15 --index_shortening=2 --index_type=2 --ingest_external_file_one_in=0 --inplace_update_support=0 --iterpercent=10 --key_len_percent_dist=1,30,69 --key_may_exist_one_in=100000 --last_level_temperature=kUnknown --level_compaction_dynamic_level_bytes=0 --lock_wal_one_in=1000000 --log2_keys_per_lock=10 --log_file_time_to_roll=0 --log_readahead_size=0 --long_running_snapshots=0 --low_pri_pool_ratio=0.5 --lowest_used_cache_tier=2 --manifest_preallocation_size=5120 --manual_wal_flush_one_in=1000 --mark_for_compaction_one_file_in=10 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=1000 --max_key_len=3 --max_log_file_size=1048576 --max_manifest_file_size=1073741824 --max_sequential_skip_in_iterations=8 --max_total_wal_size=0 --max_write_batch_group_size_bytes=16777216 --max_write_buffer_number=10 --max_write_buffer_size_to_maintain=4194304 --memtable_insert_hint_per_batch=1 --memtable_max_range_deletions=0 --memtable_prefix_bloom_size_ratio=0.01 --memtable_protection_bytes_per_key=2 --memtable_whole_key_filtering=1 --memtablerep=skip_list --metadata_charge_policy=1 --metadata_read_fault_one_in=32 --metadata_write_fault_one_in=0 --min_write_buffer_number_to_merge=2 --mmap_read=0 --mock_direct_io=True --nooverwritepercent=1 --open_files=-1 --open_metadata_read_fault_one_in=0 --open_metadata_write_fault_one_in=8 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=40000 --optimize_filters_for_hits=0 --optimize_filters_for_memory=1 --optimize_multiget_for_io=1 --paranoid_file_checks=1 --paranoid_memory_checks=0 --partition_filters=0 --partition_pinning=3 --pause_background_one_in=10000 --periodic_compaction_seconds=0 --prepopulate_block_cache=1 --preserve_internal_time_seconds=0 --progress_reports=0 --promote_l0_one_in=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=32  --readpercent=10 --recycle_log_file_num=0 --reopen=0 --report_bg_io_stats=0 --reset_stats_one_in=1000000 --sample_for_compression=5 --secondary_cache_fault_one_in=0 --secondary_cache_uri= --skip_stats_update_on_db_open=0 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=1048576 --sqfc_name=foo --sqfc_version=2 --sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=1048576 --stats_dump_period_sec=10 --stats_history_buffer_size=1048576 --strict_bytes_per_sync=1 --subcompactions=4 --sync=0 --sync_fault_injection=1 --table_cache_numshardbits=-1 --target_file_size_base=524288 --target_file_size_multiplier=2 --test_batches_snapshots=0 --top_level_index_pinning=0 --uncache_aggressiveness=1 --universal_max_read_amp=10 --unpartitioned_pinning=0 --use_adaptive_mutex=1 --use_adaptive_mutex_lru=1 --use_attribute_group=0 --use_delta_encoding=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=1 --use_full_merge_v1=1 --use_get_entity=0 --use_merge=1 --use_multi_cf_iterator=1 --use_multi_get_entity=0 --use_multiget=1 --use_put_entity_one_in=0 --use_sqfc_for_range_queries=1 --use_timed_put_one_in=0 --use_write_buffer_manager=1 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000 --verify_compression=0 --verify_db_one_in=100000 --verify_file_checksums_one_in=1000 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=none --write_buffer_size=1048576 --write_dbid_to_manifest=1 --write_fault_one_in=1000 --writepercent=10
```

Reviewed By: anand1976

Differential Revision: D64367065

Pulled By: hx235

fbshipit-source-id: 5750c05ccc835c3e9dc81c961b76deaf30bd23c2
This commit is contained in:
Hui Xiao 2024-10-17 15:52:55 -07:00 committed by Facebook GitHub Bot
parent ac24f152a1
commit 58fc9d61b7
6 changed files with 349 additions and 134 deletions

View File

@ -1450,8 +1450,10 @@ class NonBatchedOpsStressTest : public StressTest {
Slice ub_slice;
ReadOptions ro_copy = read_opts;
// Get the next prefix first and then see if we want to set upper bound.
// We'll use the next prefix in an assertion later on
// Randomly test with `iterate_upper_bound` and `prefix_same_as_start`
//
// Get the next prefix first and then see if we want to set it to be the
// upper bound. We'll use the next prefix in an assertion later on
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
// For half of the time, set the upper bound to the next prefix
ub_slice = Slice(upper_bound);
@ -1460,6 +1462,8 @@ class NonBatchedOpsStressTest : public StressTest {
ro_copy.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(prefix, ub_slice);
}
} else if (options_.prefix_extractor && thread->rand.OneIn(2)) {
ro_copy.prefix_same_as_start = true;
}
std::string read_ts_str;
@ -1480,8 +1484,16 @@ class NonBatchedOpsStressTest : public StressTest {
uint64_t count = 0;
Status s;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
// If upper or prefix bounds is specified, only keys of the target
// prefix should show up. Otherwise, we need to manual exit the loop when
// we see the first key that is not in the target prefix show up.
if (ro_copy.iterate_upper_bound != nullptr ||
ro_copy.prefix_same_as_start) {
assert(iter->key().starts_with(prefix));
} else if (!iter->key().starts_with(prefix)) {
break;
}
++count;
// When iter_start_ts is set, iterator exposes internal keys, including
@ -1535,7 +1547,14 @@ class NonBatchedOpsStressTest : public StressTest {
if (s.ok()) {
thread->stats.AddPrefixes(1, count);
} else if (injected_error_count == 0 || !IsErrorInjectedAndRetryable(s)) {
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
fprintf(stderr,
"TestPrefixScan error: %s with ReadOptions::iterate_upper_bound: "
"%s, prefix_same_as_start: %s \n",
s.ToString().c_str(),
ro_copy.iterate_upper_bound
? ro_copy.iterate_upper_bound->ToString(true).c_str()
: "nullptr",
ro_copy.prefix_same_as_start ? "true" : "false");
thread->shared->SetVerificationFailure();
}

View File

@ -11,6 +11,7 @@
#ifdef GFLAGS
#include "tools/io_tracer_parser_tool.h"
#endif
#include "rocksdb/flush_block_policy.h"
#include "util/random.h"
namespace {
@ -121,6 +122,81 @@ class PrefetchTest
table_options.index_type =
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
}
void VerifyScan(ReadOptions& iter_ro, ReadOptions& cmp_iter_ro,
const Slice* seek_key, const Slice* iterate_upper_bound,
bool prefix_same_as_start) const {
assert(!(seek_key == nullptr));
iter_ro.iterate_upper_bound = cmp_iter_ro.iterate_upper_bound =
iterate_upper_bound;
iter_ro.prefix_same_as_start = cmp_iter_ro.prefix_same_as_start =
prefix_same_as_start;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(iter_ro));
auto cmp_iter = std::unique_ptr<Iterator>(db_->NewIterator(cmp_iter_ro));
iter->Seek(*seek_key);
cmp_iter->Seek(*seek_key);
while (iter->Valid() && cmp_iter->Valid()) {
if (iter->key() != cmp_iter->key()) {
// Error
ASSERT_TRUE(false);
}
iter->Next();
cmp_iter->Next();
}
ASSERT_TRUE(!cmp_iter->Valid() && !iter->Valid());
ASSERT_TRUE(cmp_iter->status().ok() && iter->status().ok());
}
void VerifySeekPrevSeek(ReadOptions& iter_ro, ReadOptions& cmp_iter_ro,
const Slice* seek_key,
const Slice* iterate_upper_bound,
bool prefix_same_as_start) {
assert(!(seek_key == nullptr));
iter_ro.iterate_upper_bound = cmp_iter_ro.iterate_upper_bound =
iterate_upper_bound;
iter_ro.prefix_same_as_start = cmp_iter_ro.prefix_same_as_start =
prefix_same_as_start;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(iter_ro));
auto cmp_iter = std::unique_ptr<Iterator>(db_->NewIterator(cmp_iter_ro));
// Seek
cmp_iter->Seek(*seek_key);
ASSERT_TRUE(cmp_iter->Valid());
ASSERT_OK(cmp_iter->status());
iter->Seek(*seek_key);
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), cmp_iter->key());
// Prev op should pass
cmp_iter->Prev();
ASSERT_TRUE(cmp_iter->Valid());
ASSERT_OK(cmp_iter->status());
iter->Prev();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), cmp_iter->key());
// Reseek would follow as usual
cmp_iter->Seek(*seek_key);
ASSERT_TRUE(cmp_iter->Valid());
ASSERT_OK(cmp_iter->status());
iter->Seek(*seek_key);
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), cmp_iter->key());
}
};
INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
@ -1262,6 +1338,8 @@ TEST_P(PrefetchTest, PrefetchWithBlockLookupAutoTuneTest) {
Options options;
SetGenericOptions(env.get(), /*use_direct_io=*/false, options);
options.statistics = CreateDBStatistics();
const std::string prefix = "my_key_";
options.prefix_extractor.reset(NewFixedPrefixTransform(prefix.size()));
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
@ -1272,8 +1350,9 @@ TEST_P(PrefetchTest, PrefetchWithBlockLookupAutoTuneTest) {
Random rnd(309);
WriteBatch batch;
// Create the DB with keys from "my_key_aaaaaaaaaa" to "my_key_zzzzzzzzzz"
for (int i = 0; i < 26; i++) {
std::string key = "my_key_";
std::string key = prefix;
for (int j = 0; j < 10; j++) {
key += char('a' + i);
@ -1282,9 +1361,9 @@ TEST_P(PrefetchTest, PrefetchWithBlockLookupAutoTuneTest) {
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = "my_key_a";
std::string start_key = prefix + "a";
std::string end_key = "my_key_";
std::string end_key = prefix;
for (int j = 0; j < 10; j++) {
end_key += char('a' + 25);
}
@ -1309,32 +1388,30 @@ TEST_P(PrefetchTest, PrefetchWithBlockLookupAutoTuneTest) {
{
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
iter->Seek("my_key_bbb");
iter->Seek(prefix + "bbb");
ASSERT_TRUE(iter->Valid());
iter->Seek("my_key_ccccccccc");
iter->Seek(prefix + "ccccccccc");
ASSERT_TRUE(iter->Valid());
iter->Seek("my_key_ddd");
iter->Seek(prefix + "ddd");
ASSERT_TRUE(iter->Valid());
iter->Seek("my_key_ddddddd");
iter->Seek(prefix + "ddddddd");
ASSERT_TRUE(iter->Valid());
iter->Seek("my_key_e");
iter->Seek(prefix + "e");
ASSERT_TRUE(iter->Valid());
iter->Seek("my_key_eeeee");
iter->Seek(prefix + "eeeee");
ASSERT_TRUE(iter->Valid());
iter->Seek("my_key_eeeeeeeee");
iter->Seek(prefix + "eeeeeeeee");
ASSERT_TRUE(iter->Valid());
}
ReadOptions ropts;
ropts.auto_readahead_size = true;
ReadOptions cmp_ro;
cmp_ro.auto_readahead_size = false;
if (std::get<0>(GetParam())) {
ropts.readahead_size = cmp_ro.readahead_size = 32768;
@ -1345,61 +1422,31 @@ TEST_P(PrefetchTest, PrefetchWithBlockLookupAutoTuneTest) {
}
// With and without tuning readahead_size.
{
ASSERT_OK(options.statistics->Reset());
// Seek.
{
Slice ub = Slice("my_key_uuu");
Slice* ub_ptr = &ub;
cmp_ro.iterate_upper_bound = ub_ptr;
ropts.iterate_upper_bound = ub_ptr;
ropts.auto_readahead_size = true;
cmp_ro.auto_readahead_size = false;
ASSERT_OK(options.statistics->Reset());
// Seek with a upper bound
const std::string seek_key_str = prefix + "aaa";
const Slice seek_key(seek_key_str);
const std::string ub_str = prefix + "uuu";
const Slice ub(ub_str);
VerifyScan(ropts /* iter_ro */, cmp_ro /* cmp_iter_ro */,
&seek_key /* seek_key */, &ub /* iterate_upper_bound */,
false /* prefix_same_as_start */);
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ropts));
auto cmp_iter = std::unique_ptr<Iterator>(db_->NewIterator(cmp_ro));
// Seek with a new seek key and upper bound
const std::string seek_key_new_str = prefix + "v";
const Slice seek_key_new(seek_key_new_str);
const std::string ub_new_str = prefix + "y";
const Slice ub_new(ub_new_str);
VerifyScan(ropts /* iter_ro */, cmp_ro /* cmp_iter_ro */,
&seek_key_new /* seek_key */, &ub_new /* iterate_upper_bound */,
false /* prefix_same_as_start */);
Slice seek_key = Slice("my_key_aaa");
iter->Seek(seek_key);
cmp_iter->Seek(seek_key);
while (iter->Valid() && cmp_iter->Valid()) {
if (iter->key() != cmp_iter->key()) {
// Error
ASSERT_TRUE(false);
}
iter->Next();
cmp_iter->Next();
}
ASSERT_OK(cmp_iter->status());
ASSERT_OK(iter->status());
}
// Reseek with new upper_bound_iterator.
{
Slice ub = Slice("my_key_y");
ropts.iterate_upper_bound = &ub;
cmp_ro.iterate_upper_bound = &ub;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ropts));
auto cmp_iter = std::unique_ptr<Iterator>(db_->NewIterator(cmp_ro));
Slice reseek_key = Slice("my_key_v");
iter->Seek(reseek_key);
cmp_iter->Seek(reseek_key);
while (iter->Valid() && cmp_iter->Valid()) {
if (iter->key() != cmp_iter->key()) {
// Error
ASSERT_TRUE(false);
}
iter->Next();
cmp_iter->Next();
}
ASSERT_OK(cmp_iter->status());
ASSERT_OK(iter->status());
}
}
// Seek with no upper bound, prefix_same_as_start = true
VerifyScan(ropts /* iter_ro */, cmp_ro /* cmp_iter_ro */,
&seek_key /* seek_key */, nullptr /* iterate_upper_bound */,
true /* prefix_same_as_start */);
Close();
}
}
@ -1418,6 +1465,8 @@ TEST_F(PrefetchTest, PrefetchWithBlockLookupAutoTuneWithPrev) {
Options options;
SetGenericOptions(env.get(), /*use_direct_io=*/false, options);
options.statistics = CreateDBStatistics();
const std::string prefix = "my_key_";
options.prefix_extractor.reset(NewFixedPrefixTransform(prefix.size()));
BlockBasedTableOptions table_options;
SetBlockBasedTableOptions(table_options);
std::shared_ptr<Cache> cache = NewLRUCache(1024 * 1024, 2);
@ -1432,7 +1481,7 @@ TEST_F(PrefetchTest, PrefetchWithBlockLookupAutoTuneWithPrev) {
WriteBatch batch;
for (int i = 0; i < 26; i++) {
std::string key = "my_key_";
std::string key = prefix;
for (int j = 0; j < 10; j++) {
key += char('a' + i);
@ -1441,9 +1490,9 @@ TEST_F(PrefetchTest, PrefetchWithBlockLookupAutoTuneWithPrev) {
}
ASSERT_OK(db_->Write(WriteOptions(), &batch));
std::string start_key = "my_key_a";
std::string start_key = prefix + "a";
std::string end_key = "my_key_";
std::string end_key = prefix;
for (int j = 0; j < 10; j++) {
end_key += char('a' + 25);
}
@ -1455,57 +1504,146 @@ TEST_F(PrefetchTest, PrefetchWithBlockLookupAutoTuneWithPrev) {
ReadOptions ropts;
ropts.auto_readahead_size = true;
ReadOptions cmp_readopts = ropts;
cmp_readopts.auto_readahead_size = false;
const std::string seek_key_str = prefix + "bbb";
const Slice seek_key(seek_key_str);
const std::string ub_key = prefix + "uuu";
const Slice ub(ub_key);
VerifySeekPrevSeek(ropts /* iter_ro */, cmp_readopts /* cmp_iter_ro */,
&seek_key /* seek_key */, &ub /* iterate_upper_bound */,
false /* prefix_same_as_start */);
VerifySeekPrevSeek(ropts /* iter_ro */, cmp_readopts /* cmp_iter_ro */,
&seek_key /* seek_key */,
nullptr /* iterate_upper_bound */,
true /* prefix_same_as_start */);
Close();
}
class PrefetchTrimReadaheadTestParam
: public DBTestBase,
public ::testing::WithParamInterface<
std::tuple<BlockBasedTableOptions::IndexShorteningMode, bool>> {
public:
const std::string kPrefix = "a_prefix_";
Random rnd = Random(309);
PrefetchTrimReadaheadTestParam()
: DBTestBase("prefetch_trim_readahead_test_param", true) {}
virtual void SetGenericOptions(Env* env, Options& options) {
options = CurrentOptions();
options.env = env;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
// To make all the data bocks fit in one file for testing purpose
options.write_buffer_size = 1024 * 1024 * 1024;
options.prefix_extractor.reset(NewFixedPrefixTransform(kPrefix.size()));
}
void SetBlockBasedTableOptions(BlockBasedTableOptions& table_options) {
table_options.no_block_cache = false;
table_options.index_shortening = std::get<0>(GetParam());
// To force keys with different prefixes are in different data blocks of the
// file for testing purpose
table_options.block_size = 1;
table_options.flush_block_policy_factory.reset(
new FlushBlockBySizePolicyFactory());
}
};
INSTANTIATE_TEST_CASE_P(
PrefetchTrimReadaheadTestParam, PrefetchTrimReadaheadTestParam,
::testing::Combine(
// Params are as follows -
// Param 0 - TableOptions::index_shortening
// Param 2 - ReadOptinos::auto_readahead_size
::testing::Values(
BlockBasedTableOptions::IndexShorteningMode::kNoShortening,
BlockBasedTableOptions::IndexShorteningMode::kShortenSeparators,
BlockBasedTableOptions::IndexShorteningMode::
kShortenSeparatorsAndSuccessor),
::testing::Bool()));
TEST_P(PrefetchTrimReadaheadTestParam, PrefixSameAsStart) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
const bool auto_readahead_size = std::get<1>(GetParam());
std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
FileSystem::Default(), false /* support_prefetch */,
true /* small_buffer_alignment */);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options;
SetGenericOptions(env.get(), options);
BlockBasedTableOptions table_optoins;
SetBlockBasedTableOptions(table_optoins);
options.table_factory.reset(NewBlockBasedTableFactory(table_optoins));
Status s = TryReopen(options);
ASSERT_OK(s);
// To create a DB with data block layout (denoted as "[...]" below ) as the
// following:
// ["a_prefix_0": random value]
// ["a_prefix_1": random value]
// ...
// ["a_prefix_9": random value]
// ["c_prefix_0": random value]
// ["d_prefix_1": random value]
// ...
// ["l_prefix_9": random value]
//
// We want to verify keys not with prefix "a_prefix_" are not prefetched due
// to trimming
WriteBatch prefix_batch;
for (int i = 0; i < 10; i++) {
std::string key = kPrefix + std::to_string(i);
ASSERT_OK(prefix_batch.Put(key, rnd.RandomString(100)));
}
ASSERT_OK(db_->Write(WriteOptions(), &prefix_batch));
WriteBatch diff_prefix_batch;
for (int i = 0; i < 10; i++) {
std::string diff_prefix = std::string(1, char('c' + i)) + kPrefix.substr(1);
std::string key = diff_prefix + std::to_string(i);
ASSERT_OK(diff_prefix_batch.Put(key, rnd.RandomString(100)));
}
ASSERT_OK(db_->Write(WriteOptions(), &diff_prefix_batch));
ASSERT_OK(db_->Flush(FlushOptions()));
// To verify readahead is trimmed based on prefix by checking the counter
// READAHEAD_TRIMMED
ReadOptions ro;
ro.prefix_same_as_start = true;
ro.auto_readahead_size = auto_readahead_size;
// Set a large readahead size to introduce readahead waste when without
// trimming based on prefix
ro.readahead_size = 1024 * 1024 * 1024;
ASSERT_OK(options.statistics->Reset());
{
// Seek.
Slice ub = Slice("my_key_uuu");
Slice* ub_ptr = &ub;
ropts.iterate_upper_bound = ub_ptr;
ropts.auto_readahead_size = true;
ReadOptions cmp_readopts = ropts;
cmp_readopts.auto_readahead_size = false;
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ropts));
auto cmp_iter = std::unique_ptr<Iterator>(db_->NewIterator(cmp_readopts));
Slice seek_key = Slice("my_key_bbb");
{
cmp_iter->Seek(seek_key);
ASSERT_TRUE(cmp_iter->Valid());
ASSERT_OK(cmp_iter->status());
iter->Seek(seek_key);
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), cmp_iter->key());
auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
for (iter->Seek(kPrefix); iter->status().ok() && iter->Valid();
iter->Next()) {
}
}
// Prev op should pass with auto tuning of readahead_size.
{
cmp_iter->Prev();
ASSERT_TRUE(cmp_iter->Valid());
ASSERT_OK(cmp_iter->status());
auto readahead_trimmed =
options.statistics->getTickerCount(READAHEAD_TRIMMED);
iter->Prev();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), cmp_iter->key());
}
// Reseek would follow as usual.
{
cmp_iter->Seek(seek_key);
ASSERT_TRUE(cmp_iter->Valid());
ASSERT_OK(cmp_iter->status());
iter->Seek(seek_key);
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), cmp_iter->key());
}
if (auto_readahead_size) {
ASSERT_GT(readahead_trimmed, 0);
} else {
ASSERT_EQ(readahead_trimmed, 0);
}
Close();
}

View File

@ -1912,10 +1912,25 @@ struct ReadOptions {
std::function<bool(const TableProperties&)> table_filter;
// If auto_readahead_size is set to true, it will auto tune the readahead_size
// during scans internally.
// For this feature to enabled, iterate_upper_bound must also be specified.
// during scans internally based on block cache data when block cache is
// enabled, iteration upper bound when `iterate_upper_bound != nullptr` and
// prefix when `prefix_same_as_start == true`
//
// NOTE: - Recommended for forward Scans only.
// Besides enabling block cache, it
// also requires `iterate_upper_bound != nullptr` or `prefix_same_as_start ==
// true` for this option to take effect
//
// To be specific, it does the following:
// (1) When `iterate_upper_bound`
// is specified, trim the readahead so the readahead does not exceed iteration
// upper bound
// (2) When `prefix_same_as_start` is set to true, trim the
// readahead so data blocks containing keys that are not in the same prefix as
// the seek key in `Seek()` are not prefetched
// - Limition: `Seek(key)` instead of `SeekToFirst()` needs to be called in
// order for this trimming to take effect
//
// NOTE: - Used for forward Scans only.
// - If there is a backward scans, this option will be
// disabled internally and won't be enabled again if the forward scan
// is issued again.

View File

@ -35,6 +35,17 @@ void BlockBasedTableIterator::SeekSecondPass(const Slice* target) {
void BlockBasedTableIterator::SeekImpl(const Slice* target,
bool async_prefetch) {
// TODO(hx235): set `seek_key_prefix_for_readahead_trimming_`
// even when `target == nullptr` that is when `SeekToFirst()` is called
if (target != nullptr && prefix_extractor_ &&
read_options_.prefix_same_as_start) {
const Slice& seek_user_key = ExtractUserKey(*target);
seek_key_prefix_for_readahead_trimming_ =
prefix_extractor_->InDomain(seek_user_key)
? prefix_extractor_->Transform(seek_user_key).ToString()
: "";
}
bool is_first_pass = !async_read_in_progress_;
if (!is_first_pass) {
@ -44,9 +55,9 @@ void BlockBasedTableIterator::SeekImpl(const Slice* target,
ResetBlockCacheLookupVar();
bool autotune_readaheadsize = is_first_pass &&
read_options_.auto_readahead_size &&
read_options_.iterate_upper_bound;
bool autotune_readaheadsize =
is_first_pass && read_options_.auto_readahead_size &&
(read_options_.iterate_upper_bound || read_options_.prefix_same_as_start);
if (autotune_readaheadsize &&
table_->get_rep()->table_options.block_cache.get() &&
@ -778,7 +789,7 @@ void BlockBasedTableIterator::BlockCacheLookupForReadAheadSize(
size_t footer = table_->get_rep()->footer.GetBlockTrailerSize();
if (read_curr_block && !DoesContainBlockHandles() &&
IsNextBlockOutOfBound()) {
IsNextBlockOutOfReadaheadBound()) {
end_offset = index_iter_->value().handle.offset() + footer +
index_iter_->value().handle.size();
return;
@ -850,7 +861,7 @@ void BlockBasedTableIterator::BlockCacheLookupForReadAheadSize(
// If curr block's index key >= iterate_upper_bound, it
// means all the keys in next block or above are out of
// bound.
if (IsNextBlockOutOfBound()) {
if (IsNextBlockOutOfReadaheadBound()) {
is_index_out_of_bound_ = true;
break;
}

View File

@ -353,6 +353,11 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
// is used to disable the lookup.
IterDirection direction_ = IterDirection::kForward;
// The prefix of the key called with SeekImpl().
// This is for readahead trimming so no data blocks containing keys of a
// different prefix are prefetched
std::string seek_key_prefix_for_readahead_trimming_ = "";
void SeekSecondPass(const Slice* target);
// If `target` is null, seek to first.
@ -408,15 +413,41 @@ class BlockBasedTableIterator : public InternalIteratorBase<Slice> {
ClearBlockHandles();
}
bool IsNextBlockOutOfBound() {
bool IsNextBlockOutOfReadaheadBound() {
const Slice& index_iter_user_key = index_iter_->user_key();
// If curr block's index key >= iterate_upper_bound, it means all the keys
// in next block or above are out of bound.
return (user_comparator_.CompareWithoutTimestamp(
index_iter_->user_key(),
/*a_has_ts=*/true, *read_options_.iterate_upper_bound,
/*b_has_ts=*/false) >= 0
? true
: false);
bool out_of_upper_bound =
read_options_.iterate_upper_bound != nullptr &&
(user_comparator_.CompareWithoutTimestamp(
index_iter_user_key,
/*a_has_ts=*/true, *read_options_.iterate_upper_bound,
/*b_has_ts=*/false) >= 0
? true
: false);
if (out_of_upper_bound) {
return true;
}
// If curr block's index key has a different prefix from the seek key's, it
// means all the keys in next block or above has a different prefix from the
// seek key's.
bool out_of_prefix_bound =
(read_options_.prefix_same_as_start &&
!seek_key_prefix_for_readahead_trimming_.empty() &&
(prefix_extractor_->InDomain(index_iter_user_key)
? (prefix_extractor_->Transform(index_iter_user_key)
.compare(seek_key_prefix_for_readahead_trimming_) != 0)
: user_comparator_.CompareWithoutTimestamp(
index_iter_user_key,
/*a_has_ts=*/true, seek_key_prefix_for_readahead_trimming_,
/*b_has_ts=*/false) > 0));
if (out_of_prefix_bound) {
return true;
}
return false;
}
void ClearBlockHandles() {

View File

@ -0,0 +1 @@
* Trim readahead_size during scans so data blocks containing keys that are not in the same prefix as the seek key in `Seek()` are not prefetched when `ReadOptions::auto_readahead_size=true` (default value) and `ReadOptions::prefix_same_as_start = true`