mirror of https://github.com/facebook/rocksdb.git
Fix heap use after free error on retry after checksum mismatch (#12464)
Summary: Fix the heap use after free bug caused by freeing the file system IO buffer in `BlockFetcher::ReadBlock()` instead of the caller. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12464 Test Plan: Update the `DBIOCorruptionTest` tests Reviewed By: akankshamahajan15 Differential Revision: D55206920 Pulled By: anand1976 fbshipit-source-id: fd6b608a61cd229b20c1e5f348ff3cc92328de0f
This commit is contained in:
parent
088dc7283b
commit
3b736c4aa3
|
@ -20,13 +20,15 @@ class CorruptionFS : public FileSystemWrapper {
|
|||
bool writable_file_error_;
|
||||
int num_writable_file_errors_;
|
||||
|
||||
explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target)
|
||||
explicit CorruptionFS(const std::shared_ptr<FileSystem>& _target,
|
||||
bool fs_buffer)
|
||||
: FileSystemWrapper(_target),
|
||||
writable_file_error_(false),
|
||||
num_writable_file_errors_(0),
|
||||
corruption_trigger_(INT_MAX),
|
||||
read_count_(0),
|
||||
rnd_(300) {}
|
||||
rnd_(300),
|
||||
fs_buffer_(fs_buffer) {}
|
||||
~CorruptionFS() override {
|
||||
// Assert that the corruption was reset, which means it got triggered
|
||||
assert(corruption_trigger_ == INT_MAX);
|
||||
|
@ -81,7 +83,21 @@ class CorruptionFS : public FileSystemWrapper {
|
|||
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
|
||||
const IOOptions& options,
|
||||
IODebugContext* dbg) override {
|
||||
return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
|
||||
for (size_t i = 0; i < num_reqs; ++i) {
|
||||
FSReadRequest& req = reqs[i];
|
||||
if (fs_.fs_buffer_) {
|
||||
FSAllocationPtr buffer(new char[req.len], [](void* ptr) {
|
||||
delete[] static_cast<char*>(ptr);
|
||||
});
|
||||
req.fs_scratch = std::move(buffer);
|
||||
req.status = Read(req.offset, req.len, options, &req.result,
|
||||
static_cast<char*>(req.fs_scratch.get()), dbg);
|
||||
} else {
|
||||
req.status = Read(req.offset, req.len, options, &req.result,
|
||||
req.scratch, dbg);
|
||||
}
|
||||
}
|
||||
return IOStatus::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
|
@ -99,12 +115,16 @@ class CorruptionFS : public FileSystemWrapper {
|
|||
void SupportedOps(int64_t& supported_ops) override {
|
||||
supported_ops = 1 << FSSupportedOps::kVerifyAndReconstructRead |
|
||||
1 << FSSupportedOps::kAsyncIO;
|
||||
if (fs_buffer_) {
|
||||
supported_ops |= 1 << FSSupportedOps::kFSBuffer;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
int corruption_trigger_;
|
||||
int read_count_;
|
||||
Random rnd_;
|
||||
bool fs_buffer_;
|
||||
};
|
||||
} // anonymous namespace
|
||||
|
||||
|
@ -674,8 +694,9 @@ TEST_F(DBIOFailureTest, CompactionSstSyncError) {
|
|||
}
|
||||
#endif // !(defined NDEBUG) || !defined(OS_WIN)
|
||||
|
||||
class DBIOCorruptionTest : public DBIOFailureTest,
|
||||
public testing::WithParamInterface<bool> {
|
||||
class DBIOCorruptionTest
|
||||
: public DBIOFailureTest,
|
||||
public testing::WithParamInterface<std::tuple<bool, bool>> {
|
||||
public:
|
||||
DBIOCorruptionTest() : DBIOFailureTest() {
|
||||
BlockBasedTableOptions bbto;
|
||||
|
@ -683,7 +704,8 @@ class DBIOCorruptionTest : public DBIOFailureTest,
|
|||
|
||||
base_env_ = env_;
|
||||
EXPECT_NE(base_env_, nullptr);
|
||||
fs_.reset(new CorruptionFS(base_env_->GetFileSystem()));
|
||||
fs_.reset(
|
||||
new CorruptionFS(base_env_->GetFileSystem(), std::get<0>(GetParam())));
|
||||
env_guard_ = NewCompositeEnv(fs_);
|
||||
options.env = env_guard_.get();
|
||||
bbto.num_file_reads_for_auto_readahead = 0;
|
||||
|
@ -714,7 +736,7 @@ TEST_P(DBIOCorruptionTest, GetReadCorruptionRetry) {
|
|||
|
||||
std::string val;
|
||||
ReadOptions ro;
|
||||
ro.async_io = GetParam();
|
||||
ro.async_io = std::get<1>(GetParam());
|
||||
ASSERT_OK(dbfull()->Get(ReadOptions(), "key1", &val));
|
||||
ASSERT_EQ(val, "val1");
|
||||
}
|
||||
|
@ -729,7 +751,7 @@ TEST_P(DBIOCorruptionTest, IterReadCorruptionRetry) {
|
|||
|
||||
ReadOptions ro;
|
||||
ro.readahead_size = 65536;
|
||||
ro.async_io = GetParam();
|
||||
ro.async_io = std::get<1>(GetParam());
|
||||
|
||||
Iterator* iter = dbfull()->NewIterator(ro);
|
||||
iter->SeekToFirst();
|
||||
|
@ -754,7 +776,7 @@ TEST_P(DBIOCorruptionTest, MultiGetReadCorruptionRetry) {
|
|||
std::vector<PinnableSlice> values(keys.size());
|
||||
std::vector<Status> statuses(keys.size());
|
||||
ReadOptions ro;
|
||||
ro.async_io = GetParam();
|
||||
ro.async_io = std::get<1>(GetParam());
|
||||
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
||||
keys.data(), values.data(), statuses.data());
|
||||
ASSERT_EQ(values[0].ToString(), "val1");
|
||||
|
@ -775,7 +797,7 @@ TEST_P(DBIOCorruptionTest, CompactionReadCorruptionRetry) {
|
|||
|
||||
std::string val;
|
||||
ReadOptions ro;
|
||||
ro.async_io = GetParam();
|
||||
ro.async_io = std::get<1>(GetParam());
|
||||
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
|
||||
ASSERT_EQ(val, "val1");
|
||||
}
|
||||
|
@ -790,13 +812,13 @@ TEST_P(DBIOCorruptionTest, FlushReadCorruptionRetry) {
|
|||
|
||||
std::string val;
|
||||
ReadOptions ro;
|
||||
ro.async_io = GetParam();
|
||||
ro.async_io = std::get<1>(GetParam());
|
||||
ASSERT_OK(dbfull()->Get(ro, "key1", &val));
|
||||
ASSERT_EQ(val, "val1");
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest,
|
||||
testing::Bool());
|
||||
testing::Combine(testing::Bool(), testing::Bool()));
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
|
|
@ -789,6 +789,8 @@ class FSSequentialFile {
|
|||
// SequentialFileWrapper too.
|
||||
};
|
||||
|
||||
using FSAllocationPtr = std::unique_ptr<void, std::function<void(void*)>>;
|
||||
|
||||
// A read IO request structure for use in MultiRead and asynchronous Read APIs.
|
||||
struct FSReadRequest {
|
||||
// Input parameter that represents the file offset in bytes.
|
||||
|
@ -855,7 +857,7 @@ struct FSReadRequest {
|
|||
// - FSReadRequest::result should point to fs_scratch.
|
||||
// - This is needed only if FSSupportedOps::kFSBuffer support is provided by
|
||||
// underlying FS.
|
||||
std::unique_ptr<void, std::function<void(void*)>> fs_scratch;
|
||||
FSAllocationPtr fs_scratch;
|
||||
};
|
||||
|
||||
// A file abstraction for randomly reading the contents of a file.
|
||||
|
|
|
@ -241,7 +241,7 @@ inline void BlockFetcher::GetBlockContents() {
|
|||
// Read a block from the file and verify its checksum. Upon return, io_status_
|
||||
// will be updated with the status of the read, and slice_ will be updated
|
||||
// with a pointer to the data.
|
||||
void BlockFetcher::ReadBlock(bool retry) {
|
||||
void BlockFetcher::ReadBlock(bool retry, FSAllocationPtr& fs_buf) {
|
||||
FSReadRequest read_req;
|
||||
IOOptions opts;
|
||||
io_status_ = file_->PrepareIOOptions(read_options_, opts);
|
||||
|
@ -336,6 +336,7 @@ void BlockFetcher::ReadBlock(bool retry) {
|
|||
|
||||
if (io_status_.ok()) {
|
||||
InsertCompressedBlockToPersistentCacheIfNeeded();
|
||||
fs_buf = std::move(read_req.fs_scratch);
|
||||
} else {
|
||||
ReleaseFileSystemProvidedBuffer(&read_req);
|
||||
direct_io_buf_.reset();
|
||||
|
@ -346,8 +347,7 @@ void BlockFetcher::ReadBlock(bool retry) {
|
|||
}
|
||||
|
||||
IOStatus BlockFetcher::ReadBlockContents() {
|
||||
FSReadRequest read_req;
|
||||
read_req.status.PermitUncheckedError();
|
||||
FSAllocationPtr fs_buf;
|
||||
if (TryGetUncompressBlockFromPersistentCache()) {
|
||||
compression_type_ = kNoCompression;
|
||||
#ifndef NDEBUG
|
||||
|
@ -360,13 +360,15 @@ IOStatus BlockFetcher::ReadBlockContents() {
|
|||
return io_status_;
|
||||
}
|
||||
} else if (!TryGetSerializedBlockFromPersistentCache()) {
|
||||
ReadBlock(/*retry =*/false);
|
||||
ReadBlock(/*retry =*/false, fs_buf);
|
||||
// If the file system supports retry after corruption, then try to
|
||||
// re-read the block and see if it succeeds.
|
||||
if (io_status_.IsCorruption() && retry_corrupt_read_) {
|
||||
ReadBlock(/*retry=*/true);
|
||||
assert(!fs_buf);
|
||||
ReadBlock(/*retry=*/true, fs_buf);
|
||||
}
|
||||
if (!io_status_.ok()) {
|
||||
assert(!fs_buf);
|
||||
return io_status_;
|
||||
}
|
||||
}
|
||||
|
@ -390,7 +392,6 @@ IOStatus BlockFetcher::ReadBlockContents() {
|
|||
}
|
||||
|
||||
InsertUncompressedBlockToPersistentCacheIfNeeded();
|
||||
ReleaseFileSystemProvidedBuffer(&read_req);
|
||||
|
||||
return io_status_;
|
||||
}
|
||||
|
@ -416,14 +417,16 @@ IOStatus BlockFetcher::ReadAsyncBlockContents() {
|
|||
return io_s;
|
||||
}
|
||||
if (io_s.ok()) {
|
||||
FSAllocationPtr fs_buf;
|
||||
// Data Block is already in prefetch.
|
||||
got_from_prefetch_buffer_ = true;
|
||||
ProcessTrailerIfPresent();
|
||||
if (io_status_.IsCorruption() && retry_corrupt_read_) {
|
||||
got_from_prefetch_buffer_ = false;
|
||||
ReadBlock(/*retry = */ true);
|
||||
ReadBlock(/*retry = */ true, fs_buf);
|
||||
}
|
||||
if (!io_status_.ok()) {
|
||||
assert(!fs_buf);
|
||||
return io_status_;
|
||||
}
|
||||
used_buf_ = const_cast<char*>(slice_.data());
|
||||
|
|
|
@ -152,7 +152,7 @@ class BlockFetcher {
|
|||
void InsertCompressedBlockToPersistentCacheIfNeeded();
|
||||
void InsertUncompressedBlockToPersistentCacheIfNeeded();
|
||||
void ProcessTrailerIfPresent();
|
||||
void ReadBlock(bool retry);
|
||||
void ReadBlock(bool retry, FSAllocationPtr& fs_buf);
|
||||
|
||||
void ReleaseFileSystemProvidedBuffer(FSReadRequest* read_req) {
|
||||
if (use_fs_scratch_) {
|
||||
|
|
Loading…
Reference in New Issue