Update MultiGet to respect the strict_capacity_limit block cache option (#13104)

Summary:
There is a `strict_capacity_limit` option which imposes a hard memory limit on the block cache. When the block cache is enabled, every read request is serviced from the block cache. If the required block is missing, it is first inserted into the cache. If `strict_capacity_limit` is `true` and the limit has been reached, the `Get` and `MultiGet` requests should fail. However, currently this is not happening for `MultiGet`.

I updated `MultiGet` to explicitly check the returned status of `MaybeReadBlockAndLoadToCache`, so the status does not get overwritten later.

Thank you anand1976 for the problem explanation.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13104

Test Plan:
Added unit test for both `Get` and `MultiGet` with a `strict_capacity_limit` set.

Before the change, half of my unit test cases failed https://github.com/facebook/rocksdb/actions/runs/11604597524/job/32313608085?pr=13104. After I added the check for the status returned by `MaybeReadBlockAndLoadToCache`, they all pass.

I also ran these tests manually (I had to run `make clean` before):

```
make -j64 block_based_table_reader_test COMPILE_WITH_ASAN=1 ASSERT_STATUS_CHECKED=1

 ./block_based_table_reader_test --gtest_filter="*StrictCapacityLimitReaderTest.Get*"
 ./block_based_table_reader_test --gtest_filter="*StrictCapacityLimitReaderTest.MultiGet*"

```

Reviewed By: anand1976

Differential Revision: D65302470

Pulled By: archang19

fbshipit-source-id: 28dcc381e67e05a89fa9fc9607b4709976d6d90e
This commit is contained in:
Andrew Ryan Chang 2024-11-01 13:22:27 -07:00 committed by Facebook GitHub Bot
parent af2a36d2c7
commit 7c98a2d130
2 changed files with 206 additions and 1 deletions

View File

@ -303,12 +303,15 @@ DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks)
/*lookup_context=*/nullptr, &serialized_block,
/*async_read=*/false, /*use_block_cache_for_lookup=*/true);
if (!s.ok()) {
statuses[idx_in_batch] = s;
continue;
}
// block_entry value could be null if no block cache is present, i.e
// BlockBasedTableOptions::no_block_cache is true and no compressed
// block cache is configured. In that case, fall
// through and set up the block explicitly
if (block_entry->GetValue() != nullptr) {
s.PermitUncheckedError();
continue;
}
}

View File

@ -723,6 +723,199 @@ TEST_P(ChargeTableReaderTest, Basic) {
}
}
class StrictCapacityLimitReaderTest : public BlockBasedTableReaderTest {
public:
StrictCapacityLimitReaderTest() : BlockBasedTableReaderTest() {}
protected:
void ConfigureTableFactory() override {
BlockBasedTableOptions table_options;
table_options.block_cache = std::make_shared<
TargetCacheChargeTrackingCache<CacheEntryRole::kBlockBasedTableReader>>(
(NewLRUCache(4 * 1024, 0 /* num_shard_bits */,
true /* strict_capacity_limit */)));
table_options.cache_index_and_filter_blocks = false;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
table_options.partition_filters = true;
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
}
};
TEST_P(StrictCapacityLimitReaderTest, Get) {
// Test that we get error status when we exceed
// the strict_capacity_limit
Options options;
size_t ts_sz = options.comparator->timestamp_size();
std::vector<std::pair<std::string, std::string>> kv =
BlockBasedTableReaderBaseTest::GenerateKVMap(
2 /* num_block */, true /* mixed_with_human_readable_string_value */,
ts_sz, false);
std::string table_name = "StrictCapacityLimitReaderTest_Get" +
CompressionTypeToString(compression_type_);
ImmutableOptions ioptions(options);
CreateTable(table_name, ioptions, compression_type_, kv);
std::unique_ptr<BlockBasedTable> table;
FileOptions foptions;
foptions.use_direct_reads = true;
InternalKeyComparator comparator(options.comparator);
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
true /* prefetch_index_and_filter_in_cache */,
nullptr /* status */);
ReadOptions read_opts;
ASSERT_OK(
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum));
bool hit_memory_limit = false;
for (size_t i = 0; i < kv.size(); i += 1) {
Slice key = kv[i].first;
Slice lkey = key;
std::string lookup_ikey;
// Reading the first entry in a block caches the whole block.
if (i % kEntriesPerBlock == 0) {
ASSERT_FALSE(table->TEST_KeyInCache(read_opts, lkey.ToString()));
} else if (!hit_memory_limit) {
ASSERT_TRUE(table->TEST_KeyInCache(read_opts, lkey.ToString()));
}
PinnableSlice value;
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, ExtractUserKey(key), &value,
nullptr, nullptr, nullptr, nullptr,
true /* do_merge */, nullptr, nullptr, nullptr,
nullptr, nullptr, nullptr);
Status s = table->Get(read_opts, lkey, &get_context, nullptr);
if (!s.ok()) {
EXPECT_TRUE(s.IsMemoryLimit());
EXPECT_TRUE(s.ToString().find("Memory limit reached: Insert failed due "
"to LRU cache being full") !=
std::string::npos);
hit_memory_limit = true;
} else {
ASSERT_EQ(value.ToString(), kv[i].second);
ASSERT_TRUE(table->TEST_KeyInCache(read_opts, lkey.ToString()));
}
}
ASSERT_TRUE(hit_memory_limit);
}
TEST_P(StrictCapacityLimitReaderTest, MultiGet) {
// Test that we get error status when we exceed
// the strict_capacity_limit
Options options;
ReadOptions read_opts;
std::string dummy_ts(sizeof(uint64_t), '\0');
Slice read_timestamp = dummy_ts;
if (udt_enabled_) {
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
read_opts.timestamp = &read_timestamp;
}
options.persist_user_defined_timestamps = persist_udt_;
size_t ts_sz = options.comparator->timestamp_size();
std::vector<std::pair<std::string, std::string>> kv =
BlockBasedTableReaderBaseTest::GenerateKVMap(
2 /* num_block */, true /* mixed_with_human_readable_string_value */,
ts_sz);
// Prepare keys, values, and statuses for MultiGet.
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys;
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys_without_timestamps;
autovector<PinnableSlice, MultiGetContext::MAX_BATCH_SIZE> values;
autovector<Status, MultiGetContext::MAX_BATCH_SIZE> statuses;
autovector<const std::string*, MultiGetContext::MAX_BATCH_SIZE>
expected_values;
{
const int step =
static_cast<int>(kv.size()) / MultiGetContext::MAX_BATCH_SIZE;
auto it = kv.begin();
for (int i = 0; i < MultiGetContext::MAX_BATCH_SIZE; i++) {
keys.emplace_back(it->first);
if (ts_sz > 0) {
Slice ukey_without_ts =
ExtractUserKeyAndStripTimestamp(it->first, ts_sz);
keys_without_timestamps.push_back(ukey_without_ts);
} else {
keys_without_timestamps.emplace_back(ExtractUserKey(it->first));
}
values.emplace_back();
statuses.emplace_back();
expected_values.push_back(&(it->second));
std::advance(it, step);
}
}
std::string table_name = "StrictCapacityLimitReaderTest_MultiGet" +
CompressionTypeToString(compression_type_);
ImmutableOptions ioptions(options);
CreateTable(table_name, ioptions, compression_type_, kv,
compression_parallel_threads_, compression_dict_bytes_);
std::unique_ptr<BlockBasedTable> table;
FileOptions foptions;
foptions.use_direct_reads = use_direct_reads_;
InternalKeyComparator comparator(options.comparator);
NewBlockBasedTableReader(foptions, ioptions, comparator, table_name, &table,
true /* bool prefetch_index_and_filter_in_cache */,
nullptr /* status */, persist_udt_);
ASSERT_OK(
table->VerifyChecksum(read_opts, TableReaderCaller::kUserVerifyChecksum));
// Ensure that keys are not in cache before MultiGet.
for (auto& key : keys) {
ASSERT_FALSE(table->TEST_KeyInCache(read_opts, key.ToString()));
}
// Prepare MultiGetContext.
autovector<GetContext, MultiGetContext::MAX_BATCH_SIZE> get_context;
autovector<KeyContext, MultiGetContext::MAX_BATCH_SIZE> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
for (size_t i = 0; i < keys.size(); ++i) {
get_context.emplace_back(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, ExtractUserKey(keys[i]),
&values[i], nullptr, nullptr, nullptr, nullptr,
true /* do_merge */, nullptr, nullptr, nullptr,
nullptr, nullptr, nullptr);
key_context.emplace_back(nullptr, keys_without_timestamps[i], &values[i],
nullptr, nullptr, &statuses.back());
key_context.back().get_context = &get_context.back();
}
for (auto& key_ctx : key_context) {
sorted_keys.emplace_back(&key_ctx);
}
MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, read_opts,
fs_.get(), nullptr);
// Execute MultiGet.
MultiGetContext::Range range = ctx.GetMultiGetRange();
PerfContext* perf_ctx = get_perf_context();
perf_ctx->Reset();
table->MultiGet(read_opts, &range, nullptr);
ASSERT_GE(perf_ctx->block_read_count - perf_ctx->index_block_read_count -
perf_ctx->filter_block_read_count -
perf_ctx->compression_dict_block_read_count,
1);
ASSERT_GE(perf_ctx->block_read_byte, 1);
bool hit_memory_limit = false;
for (const Status& status : statuses) {
if (!status.ok()) {
EXPECT_TRUE(status.IsMemoryLimit());
hit_memory_limit = true;
}
}
ASSERT_TRUE(hit_memory_limit);
}
class BlockBasedTableReaderTestVerifyChecksum
: public BlockBasedTableReaderTest {
public:
@ -829,6 +1022,15 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(false), ::testing::ValuesIn(test::GetUDTTestModes()),
::testing::Values(1, 2), ::testing::Values(0, 4096),
::testing::Values(false, true)));
INSTANTIATE_TEST_CASE_P(
StrictCapacityLimitReaderTest, StrictCapacityLimitReaderTest,
::testing::Combine(
::testing::ValuesIn(GetSupportedCompressions()), ::testing::Bool(),
::testing::Values(
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch),
::testing::Values(false), ::testing::ValuesIn(test::GetUDTTestModes()),
::testing::Values(1, 2), ::testing::Values(0),
::testing::Values(false, true)));
INSTANTIATE_TEST_CASE_P(
VerifyChecksum, BlockBasedTableReaderTestVerifyChecksum,
::testing::Combine(