Fix stale memory access with FSBuffer and tiered sec cache (#12712)

Summary:
A `BlockBasedTable` with `TieredSecondaryCache` containing a NVM cache inserts blocks  into the compressed cache and the corresponding compressed block into the NVM cache.  The `BlockFetcher` is used to get the uncompressed and compressed blocks by calling `ReadBlockContents()` and `GetUncompressedBlock()` respectively. If the file system supports FSBuffer (i.e returning a FS allocated buffer rather than caller provided), that buffer gets freed between the two calls. This PR fixes it by making the FSBuffer unique pointer a member rather than local variable.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12712

Test Plan:
1. Add a unit test
2. Release validation stress test

Reviewed By: jaykorean

Differential Revision: D57974026

Pulled By: anand1976

fbshipit-source-id: cfa895914e74b4f628413b40e6e39d8d8e5286bd
This commit is contained in:
anand76 2024-05-30 12:33:58 -07:00
parent 08f93221f5
commit 94ec0b4e6d
4 changed files with 116 additions and 11 deletions

View file

@ -873,6 +873,111 @@ TEST_P(DBTieredAdmPolicyTest, CompressedCacheAdmission) {
Destroy(options);
}
TEST_F(DBTieredSecondaryCacheTest, FSBufferTest) {
class WrapFS : public FileSystemWrapper {
public:
explicit WrapFS(const std::shared_ptr<FileSystem>& _target)
: FileSystemWrapper(_target) {}
~WrapFS() override {}
const char* Name() const override { return "WrapFS"; }
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
class WrappedRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
public:
explicit WrappedRandomAccessFile(
std::unique_ptr<FSRandomAccessFile>& file)
: FSRandomAccessFileOwnerWrapper(std::move(file)) {}
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) override {
for (size_t i = 0; i < num_reqs; ++i) {
FSReadRequest& req = reqs[i];
FSAllocationPtr buffer(new char[req.len], [](void* ptr) {
delete[] static_cast<char*>(ptr);
});
req.fs_scratch = std::move(buffer);
req.status = Read(req.offset, req.len, options, &req.result,
static_cast<char*>(req.fs_scratch.get()), dbg);
}
return IOStatus::OK();
}
};
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
EXPECT_OK(s);
result->reset(new WrappedRandomAccessFile(file));
return s;
}
void SupportedOps(int64_t& supported_ops) override {
supported_ops = 1 << FSSupportedOps::kAsyncIO;
supported_ops |= 1 << FSSupportedOps::kFSBuffer;
}
};
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
return;
}
std::shared_ptr<WrapFS> wrap_fs =
std::make_shared<WrapFS>(env_->GetFileSystem());
std::unique_ptr<Env> wrap_env(new CompositeEnvWrapper(env_, wrap_fs));
BlockBasedTableOptions table_options;
table_options.block_cache = NewCache(250 * 1024, 20 * 1024, 256 * 1024,
TieredAdmissionPolicy::kAdmPolicyAuto,
/*ready_before_wait=*/true);
table_options.block_size = 4 * 1024;
table_options.cache_index_and_filter_blocks = false;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.statistics = CreateDBStatistics();
options.env = wrap_env.get();
options.paranoid_file_checks = false;
DestroyAndReopen(options);
Random rnd(301);
const int N = 256;
for (int i = 0; i < N; i++) {
std::string p_v;
test::CompressibleString(&rnd, 0.5, 1007, &p_v);
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
std::vector<std::string> keys;
std::vector<std::string> values;
keys.push_back(Key(0));
keys.push_back(Key(4));
keys.push_back(Key(8));
values = MultiGet(keys, /*snapshot=*/nullptr, /*async=*/true);
ASSERT_EQ(values.size(), keys.size());
for (const auto& value : values) {
ASSERT_EQ(1007, value.size());
}
ASSERT_EQ(nvm_sec_cache()->num_insert_saved(), 3u);
ASSERT_EQ(nvm_sec_cache()->num_misses(), 3u);
ASSERT_EQ(nvm_sec_cache()->num_hits(), 0u);
std::string v = Get(Key(12));
ASSERT_EQ(1007, v.size());
ASSERT_EQ(nvm_sec_cache()->num_insert_saved(), 4u);
ASSERT_EQ(nvm_sec_cache()->num_misses(), 4u);
ASSERT_EQ(options.statistics->getTickerCount(BLOCK_CACHE_MISS), 4u);
Close();
Destroy(options);
}
INSTANTIATE_TEST_CASE_P(
DBTieredAdmPolicyTest, DBTieredAdmPolicyTest,
::testing::Values(TieredAdmissionPolicy::kAdmPolicyAuto,

View file

@ -241,7 +241,7 @@ inline void BlockFetcher::GetBlockContents() {
// Read a block from the file and verify its checksum. Upon return, io_status_
// will be updated with the status of the read, and slice_ will be updated
// with a pointer to the data.
void BlockFetcher::ReadBlock(bool retry, FSAllocationPtr& fs_buf) {
void BlockFetcher::ReadBlock(bool retry) {
FSReadRequest read_req;
IOOptions opts;
io_status_ = file_->PrepareIOOptions(read_options_, opts);
@ -336,7 +336,7 @@ void BlockFetcher::ReadBlock(bool retry, FSAllocationPtr& fs_buf) {
if (io_status_.ok()) {
InsertCompressedBlockToPersistentCacheIfNeeded();
fs_buf = std::move(read_req.fs_scratch);
fs_buf_ = std::move(read_req.fs_scratch);
} else {
ReleaseFileSystemProvidedBuffer(&read_req);
direct_io_buf_.reset();
@ -347,7 +347,6 @@ void BlockFetcher::ReadBlock(bool retry, FSAllocationPtr& fs_buf) {
}
IOStatus BlockFetcher::ReadBlockContents() {
FSAllocationPtr fs_buf;
if (TryGetUncompressBlockFromPersistentCache()) {
compression_type_ = kNoCompression;
#ifndef NDEBUG
@ -360,15 +359,15 @@ IOStatus BlockFetcher::ReadBlockContents() {
return io_status_;
}
} else if (!TryGetSerializedBlockFromPersistentCache()) {
ReadBlock(/*retry =*/false, fs_buf);
ReadBlock(/*retry =*/false);
// If the file system supports retry after corruption, then try to
// re-read the block and see if it succeeds.
if (io_status_.IsCorruption() && retry_corrupt_read_) {
assert(!fs_buf);
ReadBlock(/*retry=*/true, fs_buf);
assert(!fs_buf_);
ReadBlock(/*retry=*/true);
}
if (!io_status_.ok()) {
assert(!fs_buf);
assert(!fs_buf_);
return io_status_;
}
}
@ -417,16 +416,15 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() {
return io_s;
}
if (io_s.ok()) {
FSAllocationPtr fs_buf;
// Data Block is already in prefetch.
got_from_prefetch_buffer_ = true;
ProcessTrailerIfPresent();
if (io_status_.IsCorruption() && retry_corrupt_read_) {
got_from_prefetch_buffer_ = false;
ReadBlock(/*retry = */ true, fs_buf);
ReadBlock(/*retry = */ true);
}
if (!io_status_.ok()) {
assert(!fs_buf);
assert(!fs_buf_);
return io_status_;
}
used_buf_ = const_cast<char*>(slice_.data());

View file

@ -137,6 +137,7 @@ class BlockFetcher {
bool for_compaction_ = false;
bool use_fs_scratch_ = false;
bool retry_corrupt_read_ = false;
FSAllocationPtr fs_buf_;
// return true if found
bool TryGetUncompressBlockFromPersistentCache();
@ -152,7 +153,7 @@ class BlockFetcher {
void InsertCompressedBlockToPersistentCacheIfNeeded();
void InsertUncompressedBlockToPersistentCacheIfNeeded();
void ProcessTrailerIfPresent();
void ReadBlock(bool retry, FSAllocationPtr& fs_buf);
void ReadBlock(bool retry);
void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) {
if (use_fs_scratch_) {

View file

@ -0,0 +1 @@
Fixed a bug causing stale memory access when using the TieredSecondaryCache with an NVM secondary cache, and a file system that supports return an FS allocated buffer for MultiRead (FSSupportedOps::kFSBuffer is set).