From 7c98a2d1309ebab27d04962c33d256f53208474f Mon Sep 17 00:00:00 2001 From: Andrew Ryan Chang Date: Fri, 1 Nov 2024 13:22:27 -0700 Subject: [PATCH] Update MultiGet to respect the strict_capacity_limit block cache option (#13104) Summary: There is a `strict_capacity_limit` option which imposes a hard memory limit on the block cache. When the block cache is enabled, every read request is serviced from the block cache. If the required block is missing, it is first inserted into the cache. If `strict_capacity_limit` is `true` and the limit has been reached, the `Get` and `MultiGet` requests should fail. However, currently this is not happening for `MultiGet`. I updated `MultiGet` to explicitly check the returned status of `MaybeReadBlockAndLoadToCache`, so the status does not get overwritten later. Thank you anand1976 for the problem explanation. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13104 Test Plan: Added unit test for both `Get` and `MultiGet` with a `strict_capacity_limit` set. Before the change, half of my unit test cases failed https://github.com/facebook/rocksdb/actions/runs/11604597524/job/32313608085?pr=13104. After I added the check for the status returned by `MaybeReadBlockAndLoadToCache`, they all pass. I also ran these tests manually (I had to run `make clean` before): ``` make -j64 block_based_table_reader_test COMPILE_WITH_ASAN=1 ASSERT_STATUS_CHECKED=1 ./block_based_table_reader_test --gtest_filter="*StrictCapacityLimitReaderTest.Get*" ./block_based_table_reader_test --gtest_filter="*StrictCapacityLimitReaderTest.MultiGet*" ``` Reviewed By: anand1976 Differential Revision: D65302470 Pulled By: archang19 fbshipit-source-id: 28dcc381e67e05a89fa9fc9607b4709976d6d90e --- .../block_based_table_reader_sync_and_async.h | 5 +- .../block_based_table_reader_test.cc | 202 ++++++++++++++++++ 2 files changed, 206 insertions(+), 1 deletion(-) diff --git a/table/block_based/block_based_table_reader_sync_and_async.h b/table/block_based/block_based_table_reader_sync_and_async.h index 608b3882d6..7ec152fc8e 100644 --- a/table/block_based/block_based_table_reader_sync_and_async.h +++ b/table/block_based/block_based_table_reader_sync_and_async.h @@ -303,12 +303,15 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) /*lookup_context=*/nullptr, &serialized_block, /*async_read=*/false, /*use_block_cache_for_lookup=*/true); + if (!s.ok()) { + statuses[idx_in_batch] = s; + continue; + } // block_entry value could be null if no block cache is present, i.e // BlockBasedTableOptions::no_block_cache is true and no compressed // block cache is configured. In that case, fall // through and set up the block explicitly if (block_entry->GetValue() != nullptr) { - s.PermitUncheckedError(); continue; } } diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 414d4b1f9f..4a18b6fcda 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -723,6 +723,199 @@ TEST_P(ChargeTableReaderTest, Basic) { } } +class StrictCapacityLimitReaderTest : public BlockBasedTableReaderTest { + public: + StrictCapacityLimitReaderTest() : BlockBasedTableReaderTest() {} + + protected: + void ConfigureTableFactory() override { + BlockBasedTableOptions table_options; + + table_options.block_cache = std::make_shared< + TargetCacheChargeTrackingCache>( + (NewLRUCache(4 * 1024, 0 /* num_shard_bits */, + true /* strict_capacity_limit */))); + + table_options.cache_index_and_filter_blocks = false; + table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); + table_options.partition_filters = true; + table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch; + + options_.table_factory.reset(NewBlockBasedTableFactory(table_options)); + } +}; + +TEST_P(StrictCapacityLimitReaderTest, Get) { + // Test that we get error status when we exceed + // the strict_capacity_limit + Options options; + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 2 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz, false); + + std::string table_name = "StrictCapacityLimitReaderTest_Get" + + CompressionTypeToString(compression_type_); + + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, compression_type_, kv); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = true; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* prefetch_index_and_filter_in_cache */, + nullptr /* status */); + + ReadOptions read_opts; + ASSERT_OK( + table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum)); + + bool hit_memory_limit = false; + for (size_t i = 0; i < kv.size(); i += 1) { + Slice key = kv[i].first; + Slice lkey = key; + std::string lookup_ikey; + // Reading the first entry in a block caches the whole block. + if (i % kEntriesPerBlock == 0) { + ASSERT_FALSE(table->TEST_KeyInCache(read_opts, lkey.ToString())); + } else if (!hit_memory_limit) { + ASSERT_TRUE(table->TEST_KeyInCache(read_opts, lkey.ToString())); + } + PinnableSlice value; + GetContext get_context(options.comparator, nullptr, nullptr, nullptr, + GetContext::kNotFound, ExtractUserKey(key), &value, + nullptr, nullptr, nullptr, nullptr, + true /* do_merge */, nullptr, nullptr, nullptr, + nullptr, nullptr, nullptr); + Status s = table->Get(read_opts, lkey, &get_context, nullptr); + if (!s.ok()) { + EXPECT_TRUE(s.IsMemoryLimit()); + EXPECT_TRUE(s.ToString().find("Memory limit reached: Insert failed due " + "to LRU cache being full") != + std::string::npos); + hit_memory_limit = true; + } else { + ASSERT_EQ(value.ToString(), kv[i].second); + ASSERT_TRUE(table->TEST_KeyInCache(read_opts, lkey.ToString())); + } + } + + ASSERT_TRUE(hit_memory_limit); +} + +TEST_P(StrictCapacityLimitReaderTest, MultiGet) { + // Test that we get error status when we exceed + // the strict_capacity_limit + Options options; + ReadOptions read_opts; + std::string dummy_ts(sizeof(uint64_t), '\0'); + Slice read_timestamp = dummy_ts; + if (udt_enabled_) { + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + read_opts.timestamp = &read_timestamp; + } + options.persist_user_defined_timestamps = persist_udt_; + size_t ts_sz = options.comparator->timestamp_size(); + std::vector> kv = + BlockBasedTableReaderBaseTest::GenerateKVMap( + 2 /* num_block */, true /* mixed_with_human_readable_string_value */, + ts_sz); + + // Prepare keys, values, and statuses for MultiGet. + autovector keys; + autovector keys_without_timestamps; + autovector values; + autovector statuses; + autovector + expected_values; + { + const int step = + static_cast(kv.size()) / MultiGetContext::MAX_BATCH_SIZE; + auto it = kv.begin(); + for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE; i++) { + keys.emplace_back(it->first); + if (ts_sz > 0) { + Slice ukey_without_ts = + ExtractUserKeyAndStripTimestamp(it->first, ts_sz); + keys_without_timestamps.push_back(ukey_without_ts); + } else { + keys_without_timestamps.emplace_back(ExtractUserKey(it->first)); + } + values.emplace_back(); + statuses.emplace_back(); + expected_values.push_back(&(it->second)); + std::advance(it, step); + } + } + + std::string table_name = "StrictCapacityLimitReaderTest_MultiGet" + + CompressionTypeToString(compression_type_); + + ImmutableOptions ioptions(options); + CreateTable(table_name, ioptions, compression_type_, kv, + compression_parallel_threads_, compression_dict_bytes_); + + std::unique_ptr table; + FileOptions foptions; + foptions.use_direct_reads = use_direct_reads_; + InternalKeyComparator comparator(options.comparator); + NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table, + true /* bool prefetch_index_and_filter_in_cache */, + nullptr /* status */, persist_udt_); + + ASSERT_OK( + table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum)); + + // Ensure that keys are not in cache before MultiGet. + for (auto& key : keys) { + ASSERT_FALSE(table->TEST_KeyInCache(read_opts, key.ToString())); + } + + // Prepare MultiGetContext. + autovector get_context; + autovector key_context; + autovector sorted_keys; + for (size_t i = 0; i < keys.size(); ++i) { + get_context.emplace_back(options.comparator, nullptr, nullptr, nullptr, + GetContext::kNotFound, ExtractUserKey(keys[i]), + &values[i], nullptr, nullptr, nullptr, nullptr, + true /* do_merge */, nullptr, nullptr, nullptr, + nullptr, nullptr, nullptr); + key_context.emplace_back(nullptr, keys_without_timestamps[i], &values[i], + nullptr, nullptr, &statuses.back()); + key_context.back().get_context = &get_context.back(); + } + for (auto& key_ctx : key_context) { + sorted_keys.emplace_back(&key_ctx); + } + MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, read_opts, + fs_.get(), nullptr); + + // Execute MultiGet. + MultiGetContext::Range range = ctx.GetMultiGetRange(); + PerfContext* perf_ctx = get_perf_context(); + perf_ctx->Reset(); + table->MultiGet(read_opts, &range, nullptr); + + ASSERT_GE(perf_ctx->block_read_count - perf_ctx->index_block_read_count - + perf_ctx->filter_block_read_count - + perf_ctx->compression_dict_block_read_count, + 1); + ASSERT_GE(perf_ctx->block_read_byte, 1); + + bool hit_memory_limit = false; + for (const Status& status : statuses) { + if (!status.ok()) { + EXPECT_TRUE(status.IsMemoryLimit()); + hit_memory_limit = true; + } + } + ASSERT_TRUE(hit_memory_limit); +} + class BlockBasedTableReaderTestVerifyChecksum : public BlockBasedTableReaderTest { public: @@ -829,6 +1022,15 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(false), ::testing::ValuesIn(test::GetUDTTestModes()), ::testing::Values(1, 2), ::testing::Values(0, 4096), ::testing::Values(false, true))); +INSTANTIATE_TEST_CASE_P( + StrictCapacityLimitReaderTest, StrictCapacityLimitReaderTest, + ::testing::Combine( + ::testing::ValuesIn(GetSupportedCompressions()), ::testing::Bool(), + ::testing::Values( + BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch), + ::testing::Values(false), ::testing::ValuesIn(test::GetUDTTestModes()), + ::testing::Values(1, 2), ::testing::Values(0), + ::testing::Values(false, true))); INSTANTIATE_TEST_CASE_P( VerifyChecksum, BlockBasedTableReaderTestVerifyChecksum, ::testing::Combine(