mirror of https://github.com/facebook/rocksdb.git
Fix stale memory access with FSBuffer and tiered sec cache (#12712)
Summary: A `BlockBasedTable` with `TieredSecondaryCache` containing a NVM cache inserts blocks into the compressed cache and the corresponding compressed block into the NVM cache. The `BlockFetcher` is used to get the uncompressed and compressed blocks by calling `ReadBlockContents()` and `GetUncompressedBlock()` respectively. If the file system supports FSBuffer (i.e returning a FS allocated buffer rather than caller provided), that buffer gets freed between the two calls. This PR fixes it by making the FSBuffer unique pointer a member rather than local variable. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12712 Test Plan: 1. Add a unit test 2. Release validation stress test Reviewed By: jaykorean Differential Revision: D57974026 Pulled By: anand1976 fbshipit-source-id: cfa895914e74b4f628413b40e6e39d8d8e5286bd
This commit is contained in:
parent
20777b96cb
commit
0ae3d9f98d
|
@ -873,6 +873,111 @@ TEST_P(DBTieredAdmPolicyTest, CompressedCacheAdmission) {
|
||||||
Destroy(options);
|
Destroy(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBTieredSecondaryCacheTest, FSBufferTest) {
|
||||||
|
class WrapFS : public FileSystemWrapper {
|
||||||
|
public:
|
||||||
|
explicit WrapFS(const std::shared_ptr<FileSystem>& _target)
|
||||||
|
: FileSystemWrapper(_target) {}
|
||||||
|
~WrapFS() override {}
|
||||||
|
const char* Name() const override { return "WrapFS"; }
|
||||||
|
|
||||||
|
IOStatus NewRandomAccessFile(const std::string& fname,
|
||||||
|
const FileOptions& opts,
|
||||||
|
std::unique_ptr<FSRandomAccessFile>* result,
|
||||||
|
IODebugContext* dbg) override {
|
||||||
|
class WrappedRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
|
||||||
|
public:
|
||||||
|
explicit WrappedRandomAccessFile(
|
||||||
|
std::unique_ptr<FSRandomAccessFile>& file)
|
||||||
|
: FSRandomAccessFileOwnerWrapper(std::move(file)) {}
|
||||||
|
|
||||||
|
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
|
||||||
|
const IOOptions& options,
|
||||||
|
IODebugContext* dbg) override {
|
||||||
|
for (size_t i = 0; i < num_reqs; ++i) {
|
||||||
|
FSReadRequest& req = reqs[i];
|
||||||
|
FSAllocationPtr buffer(new char[req.len], [](void* ptr) {
|
||||||
|
delete[] static_cast<char*>(ptr);
|
||||||
|
});
|
||||||
|
req.fs_scratch = std::move(buffer);
|
||||||
|
req.status = Read(req.offset, req.len, options, &req.result,
|
||||||
|
static_cast<char*>(req.fs_scratch.get()), dbg);
|
||||||
|
}
|
||||||
|
return IOStatus::OK();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::unique_ptr<FSRandomAccessFile> file;
|
||||||
|
IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
|
||||||
|
EXPECT_OK(s);
|
||||||
|
result->reset(new WrappedRandomAccessFile(file));
|
||||||
|
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SupportedOps(int64_t& supported_ops) override {
|
||||||
|
supported_ops = 1 << FSSupportedOps::kAsyncIO;
|
||||||
|
supported_ops |= 1 << FSSupportedOps::kFSBuffer;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!LZ4_Supported()) {
|
||||||
|
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<WrapFS> wrap_fs =
|
||||||
|
std::make_shared<WrapFS>(env_->GetFileSystem());
|
||||||
|
std::unique_ptr<Env> wrap_env(new CompositeEnvWrapper(env_, wrap_fs));
|
||||||
|
BlockBasedTableOptions table_options;
|
||||||
|
table_options.block_cache = NewCache(250 * 1024, 20 * 1024, 256 * 1024,
|
||||||
|
TieredAdmissionPolicy::kAdmPolicyAuto,
|
||||||
|
/*ready_before_wait=*/true);
|
||||||
|
table_options.block_size = 4 * 1024;
|
||||||
|
table_options.cache_index_and_filter_blocks = false;
|
||||||
|
Options options = GetDefaultOptions();
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||||
|
options.statistics = CreateDBStatistics();
|
||||||
|
options.env = wrap_env.get();
|
||||||
|
|
||||||
|
options.paranoid_file_checks = false;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
Random rnd(301);
|
||||||
|
const int N = 256;
|
||||||
|
for (int i = 0; i < N; i++) {
|
||||||
|
std::string p_v;
|
||||||
|
test::CompressibleString(&rnd, 0.5, 1007, &p_v);
|
||||||
|
ASSERT_OK(Put(Key(i), p_v));
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
|
||||||
|
std::vector<std::string> keys;
|
||||||
|
std::vector<std::string> values;
|
||||||
|
|
||||||
|
keys.push_back(Key(0));
|
||||||
|
keys.push_back(Key(4));
|
||||||
|
keys.push_back(Key(8));
|
||||||
|
values = MultiGet(keys, /*snapshot=*/nullptr, /*async=*/true);
|
||||||
|
ASSERT_EQ(values.size(), keys.size());
|
||||||
|
for (const auto& value : values) {
|
||||||
|
ASSERT_EQ(1007, value.size());
|
||||||
|
}
|
||||||
|
ASSERT_EQ(nvm_sec_cache()->num_insert_saved(), 3u);
|
||||||
|
ASSERT_EQ(nvm_sec_cache()->num_misses(), 3u);
|
||||||
|
ASSERT_EQ(nvm_sec_cache()->num_hits(), 0u);
|
||||||
|
|
||||||
|
std::string v = Get(Key(12));
|
||||||
|
ASSERT_EQ(1007, v.size());
|
||||||
|
ASSERT_EQ(nvm_sec_cache()->num_insert_saved(), 4u);
|
||||||
|
ASSERT_EQ(nvm_sec_cache()->num_misses(), 4u);
|
||||||
|
ASSERT_EQ(options.statistics->getTickerCount(BLOCK_CACHE_MISS), 4u);
|
||||||
|
|
||||||
|
Close();
|
||||||
|
Destroy(options);
|
||||||
|
}
|
||||||
|
|
||||||
INSTANTIATE_TEST_CASE_P(
|
INSTANTIATE_TEST_CASE_P(
|
||||||
DBTieredAdmPolicyTest, DBTieredAdmPolicyTest,
|
DBTieredAdmPolicyTest, DBTieredAdmPolicyTest,
|
||||||
::testing::Values(TieredAdmissionPolicy::kAdmPolicyAuto,
|
::testing::Values(TieredAdmissionPolicy::kAdmPolicyAuto,
|
||||||
|
|
|
@ -241,7 +241,7 @@ inline void BlockFetcher::GetBlockContents() {
|
||||||
// Read a block from the file and verify its checksum. Upon return, io_status_
|
// Read a block from the file and verify its checksum. Upon return, io_status_
|
||||||
// will be updated with the status of the read, and slice_ will be updated
|
// will be updated with the status of the read, and slice_ will be updated
|
||||||
// with a pointer to the data.
|
// with a pointer to the data.
|
||||||
void BlockFetcher::ReadBlock(bool retry, FSAllocationPtr& fs_buf) {
|
void BlockFetcher::ReadBlock(bool retry) {
|
||||||
FSReadRequest read_req;
|
FSReadRequest read_req;
|
||||||
IOOptions opts;
|
IOOptions opts;
|
||||||
io_status_ = file_->PrepareIOOptions(read_options_, opts);
|
io_status_ = file_->PrepareIOOptions(read_options_, opts);
|
||||||
|
@ -336,7 +336,7 @@ void BlockFetcher::ReadBlock(bool retry, FSAllocationPtr& fs_buf) {
|
||||||
|
|
||||||
if (io_status_.ok()) {
|
if (io_status_.ok()) {
|
||||||
InsertCompressedBlockToPersistentCacheIfNeeded();
|
InsertCompressedBlockToPersistentCacheIfNeeded();
|
||||||
fs_buf = std::move(read_req.fs_scratch);
|
fs_buf_ = std::move(read_req.fs_scratch);
|
||||||
} else {
|
} else {
|
||||||
ReleaseFileSystemProvidedBuffer(&read_req);
|
ReleaseFileSystemProvidedBuffer(&read_req);
|
||||||
direct_io_buf_.reset();
|
direct_io_buf_.reset();
|
||||||
|
@ -347,7 +347,6 @@ void BlockFetcher::ReadBlock(bool retry, FSAllocationPtr& fs_buf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
IOStatus BlockFetcher::ReadBlockContents() {
|
IOStatus BlockFetcher::ReadBlockContents() {
|
||||||
FSAllocationPtr fs_buf;
|
|
||||||
if (TryGetUncompressBlockFromPersistentCache()) {
|
if (TryGetUncompressBlockFromPersistentCache()) {
|
||||||
compression_type_ = kNoCompression;
|
compression_type_ = kNoCompression;
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
|
@ -360,15 +359,15 @@ IOStatus BlockFetcher::ReadBlockContents() {
|
||||||
return io_status_;
|
return io_status_;
|
||||||
}
|
}
|
||||||
} else if (!TryGetSerializedBlockFromPersistentCache()) {
|
} else if (!TryGetSerializedBlockFromPersistentCache()) {
|
||||||
ReadBlock(/*retry =*/false, fs_buf);
|
ReadBlock(/*retry =*/false);
|
||||||
// If the file system supports retry after corruption, then try to
|
// If the file system supports retry after corruption, then try to
|
||||||
// re-read the block and see if it succeeds.
|
// re-read the block and see if it succeeds.
|
||||||
if (io_status_.IsCorruption() && retry_corrupt_read_) {
|
if (io_status_.IsCorruption() && retry_corrupt_read_) {
|
||||||
assert(!fs_buf);
|
assert(!fs_buf_);
|
||||||
ReadBlock(/*retry=*/true, fs_buf);
|
ReadBlock(/*retry=*/true);
|
||||||
}
|
}
|
||||||
if (!io_status_.ok()) {
|
if (!io_status_.ok()) {
|
||||||
assert(!fs_buf);
|
assert(!fs_buf_);
|
||||||
return io_status_;
|
return io_status_;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -417,16 +416,15 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() {
|
||||||
return io_s;
|
return io_s;
|
||||||
}
|
}
|
||||||
if (io_s.ok()) {
|
if (io_s.ok()) {
|
||||||
FSAllocationPtr fs_buf;
|
|
||||||
// Data Block is already in prefetch.
|
// Data Block is already in prefetch.
|
||||||
got_from_prefetch_buffer_ = true;
|
got_from_prefetch_buffer_ = true;
|
||||||
ProcessTrailerIfPresent();
|
ProcessTrailerIfPresent();
|
||||||
if (io_status_.IsCorruption() && retry_corrupt_read_) {
|
if (io_status_.IsCorruption() && retry_corrupt_read_) {
|
||||||
got_from_prefetch_buffer_ = false;
|
got_from_prefetch_buffer_ = false;
|
||||||
ReadBlock(/*retry = */ true, fs_buf);
|
ReadBlock(/*retry = */ true);
|
||||||
}
|
}
|
||||||
if (!io_status_.ok()) {
|
if (!io_status_.ok()) {
|
||||||
assert(!fs_buf);
|
assert(!fs_buf_);
|
||||||
return io_status_;
|
return io_status_;
|
||||||
}
|
}
|
||||||
used_buf_ = const_cast<char*>(slice_.data());
|
used_buf_ = const_cast<char*>(slice_.data());
|
||||||
|
|
|
@ -137,6 +137,7 @@ class BlockFetcher {
|
||||||
bool for_compaction_ = false;
|
bool for_compaction_ = false;
|
||||||
bool use_fs_scratch_ = false;
|
bool use_fs_scratch_ = false;
|
||||||
bool retry_corrupt_read_ = false;
|
bool retry_corrupt_read_ = false;
|
||||||
|
FSAllocationPtr fs_buf_;
|
||||||
|
|
||||||
// return true if found
|
// return true if found
|
||||||
bool TryGetUncompressBlockFromPersistentCache();
|
bool TryGetUncompressBlockFromPersistentCache();
|
||||||
|
@ -152,7 +153,7 @@ class BlockFetcher {
|
||||||
void InsertCompressedBlockToPersistentCacheIfNeeded();
|
void InsertCompressedBlockToPersistentCacheIfNeeded();
|
||||||
void InsertUncompressedBlockToPersistentCacheIfNeeded();
|
void InsertUncompressedBlockToPersistentCacheIfNeeded();
|
||||||
void ProcessTrailerIfPresent();
|
void ProcessTrailerIfPresent();
|
||||||
void ReadBlock(bool retry, FSAllocationPtr& fs_buf);
|
void ReadBlock(bool retry);
|
||||||
|
|
||||||
void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) {
|
void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) {
|
||||||
if (use_fs_scratch_) {
|
if (use_fs_scratch_) {
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed a bug causing stale memory access when using the TieredSecondaryCache with an NVM secondary cache, and a file system that supports return an FS allocated buffer for MultiRead (FSSupportedOps::kFSBuffer is set).
|
Loading…
Reference in New Issue