Change ReadAsync callback API to remove const from FSReadRequest (#11649)

Summary:
Modify ReadAsync callback API to remove const from FSReadRequest as const doesn't let to fs_scratch to move the ownership.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11649

Test Plan: CircleCI jobs

Reviewed By: anand1976

Differential Revision: D53585309

Pulled By: akankshamahajan15

fbshipit-source-id: 3bff9035db0e6fbbe34721a5963443355807420d
This commit is contained in:
Akanksha Mahajan 2024-02-16 09:14:55 -08:00 committed by Facebook GitHub Bot
parent 28c1c15c29
commit 956f1dfde3
15 changed files with 42 additions and 43 deletions

View File

@ -4277,7 +4277,7 @@ class DeadlineRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
const IOOptions& options, IODebugContext* dbg) override; const IOOptions& options, IODebugContext* dbg) override;
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, std::function<void(FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override; IODebugContext* dbg) override;
@ -4423,7 +4423,7 @@ IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len,
IOStatus DeadlineRandomAccessFile::ReadAsync( IOStatus DeadlineRandomAccessFile::ReadAsync(
FSReadRequest& req, const IOOptions& opts, FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg, std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) { void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) {
const std::chrono::microseconds deadline = fs_.GetDeadline(); const std::chrono::microseconds deadline = fs_.GetDeadline();
const std::chrono::microseconds io_timeout = fs_.GetIOTimeout(); const std::chrono::microseconds io_timeout = fs_.GetIOTimeout();

View File

@ -60,7 +60,7 @@ class DbStressRandomAccessFileWrapper : public FSRandomAccessFileOwnerWrapper {
} }
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& options, IOStatus ReadAsync(FSReadRequest& req, const IOOptions& options,
std::function<void(const FSReadRequest&, void*)> cb, std::function<void(FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override { IODebugContext* dbg) override {
#ifndef NDEBUG #ifndef NDEBUG

10
env/env_test.cc vendored
View File

@ -3447,7 +3447,7 @@ TEST_F(CreateEnvTest, CreateCompositeEnv) {
class ReadAsyncFS; class ReadAsyncFS;
struct MockIOHandle { struct MockIOHandle {
std::function<void(const FSReadRequest&, void*)> cb; std::function<void(FSReadRequest&, void*)> cb;
void* cb_arg; void* cb_arg;
bool create_io_error; bool create_io_error;
}; };
@ -3462,7 +3462,7 @@ class ReadAsyncRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
: FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {} : FSRandomAccessFileOwnerWrapper(std::move(file)), fs_(fs) {}
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, std::function<void(FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override; IODebugContext* dbg) override;
@ -3514,7 +3514,7 @@ class ReadAsyncFS : public FileSystemWrapper {
IOStatus ReadAsyncRandomAccessFile::ReadAsync( IOStatus ReadAsyncRandomAccessFile::ReadAsync(
FSReadRequest& req, const IOOptions& opts, FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg, std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) { void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) {
IOHandleDeleter deletefn = [](void* args) -> void { IOHandleDeleter deletefn = [](void* args) -> void {
delete (static_cast<MockIOHandle*>(args)); delete (static_cast<MockIOHandle*>(args));
@ -3602,8 +3602,8 @@ TEST_F(TestAsyncRead, ReadAsync) {
} }
// callback function passed to async read. // callback function passed to async read.
std::function<void(const FSReadRequest&, void*)> callback = std::function<void(FSReadRequest&, void*)> callback =
[&](const FSReadRequest& req, void* cb_arg) { [&](FSReadRequest& req, void* cb_arg) {
assert(cb_arg != nullptr); assert(cb_arg != nullptr);
size_t i = *(reinterpret_cast<size_t*>(cb_arg)); size_t i = *(reinterpret_cast<size_t*>(cb_arg));
reqs[i].offset = req.offset; reqs[i].offset = req.offset;

View File

@ -340,7 +340,7 @@ IOStatus FSRandomAccessFileTracingWrapper::InvalidateCache(size_t offset,
IOStatus FSRandomAccessFileTracingWrapper::ReadAsync( IOStatus FSRandomAccessFileTracingWrapper::ReadAsync(
FSReadRequest& req, const IOOptions& opts, FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg, std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) { void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) {
// Create a callback and populate info. // Create a callback and populate info.
auto read_async_callback = auto read_async_callback =
@ -361,8 +361,8 @@ IOStatus FSRandomAccessFileTracingWrapper::ReadAsync(
return s; return s;
} }
void FSRandomAccessFileTracingWrapper::ReadAsyncCallback( void FSRandomAccessFileTracingWrapper::ReadAsyncCallback(FSReadRequest& req,
const FSReadRequest& req, void* cb_arg) { void* cb_arg) {
ReadAsyncCallbackInfo* read_async_cb_info = ReadAsyncCallbackInfo* read_async_cb_info =
static_cast<ReadAsyncCallbackInfo*>(cb_arg); static_cast<ReadAsyncCallbackInfo*>(cb_arg);
assert(read_async_cb_info); assert(read_async_cb_info);

View File

@ -229,11 +229,11 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileOwnerWrapper {
IOStatus InvalidateCache(size_t offset, size_t length) override; IOStatus InvalidateCache(size_t offset, size_t length) override;
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, std::function<void(FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override; IODebugContext* dbg) override;
void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg); void ReadAsyncCallback(FSReadRequest& req, void* cb_arg);
private: private:
std::shared_ptr<IOTracer> io_tracer_; std::shared_ptr<IOTracer> io_tracer_;
@ -243,7 +243,7 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileOwnerWrapper {
struct ReadAsyncCallbackInfo { struct ReadAsyncCallbackInfo {
uint64_t start_time_; uint64_t start_time_;
std::function<void(const FSReadRequest&, void*)> cb_; std::function<void(FSReadRequest&, void*)> cb_;
void* cb_arg_; void* cb_arg_;
std::string file_op_; std::string file_op_;
}; };

2
env/io_posix.cc vendored
View File

@ -857,7 +857,7 @@ IOStatus PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
IOStatus PosixRandomAccessFile::ReadAsync( IOStatus PosixRandomAccessFile::ReadAsync(
FSReadRequest& req, const IOOptions& /*opts*/, FSReadRequest& req, const IOOptions& /*opts*/,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg, std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) { void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) {
if (use_direct_io()) { if (use_direct_io()) {
assert(IsSectorAligned(req.offset, GetRequiredBufferAlignment())); assert(IsSectorAligned(req.offset, GetRequiredBufferAlignment()));

15
env/io_posix.h vendored
View File

@ -76,8 +76,8 @@ inline bool IsSectorAligned(const void* ptr, size_t sector_size) {
#if defined(ROCKSDB_IOURING_PRESENT) #if defined(ROCKSDB_IOURING_PRESENT)
struct Posix_IOHandle { struct Posix_IOHandle {
Posix_IOHandle(struct io_uring* _iu, Posix_IOHandle(struct io_uring* _iu,
std::function<void(const FSReadRequest&, void*)> _cb, std::function<void(FSReadRequest&, void*)> _cb, void* _cb_arg,
void* _cb_arg, uint64_t _offset, size_t _len, char* _scratch, uint64_t _offset, size_t _len, char* _scratch,
bool _use_direct_io, size_t _alignment) bool _use_direct_io, size_t _alignment)
: iu(_iu), : iu(_iu),
cb(_cb), cb(_cb),
@ -92,7 +92,7 @@ struct Posix_IOHandle {
struct iovec iov; struct iovec iov;
struct io_uring* iu; struct io_uring* iu;
std::function<void(const FSReadRequest&, void*)> cb; std::function<void(FSReadRequest&, void*)> cb;
void* cb_arg; void* cb_arg;
uint64_t offset; uint64_t offset;
size_t len; size_t len;
@ -317,10 +317,11 @@ class PosixRandomAccessFile : public FSRandomAccessFile {
size_t GetRequiredBufferAlignment() const override { size_t GetRequiredBufferAlignment() const override {
return logical_sector_size_; return logical_sector_size_;
} }
// EXPERIMENTAL
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, virtual IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, std::function<void(FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, void* cb_arg, void** io_handle,
IOHandleDeleter* del_fn,
IODebugContext* dbg) override; IODebugContext* dbg) override;
}; };

View File

@ -790,7 +790,7 @@ bool FilePrefetchBuffer::TryReadFromCacheUntracked(
return true; return true;
} }
void FilePrefetchBuffer::PrefetchAsyncCallback(const FSReadRequest& req, void FilePrefetchBuffer::PrefetchAsyncCallback(FSReadRequest& req,
void* cb_arg) { void* cb_arg) {
BufferInfo* buf = static_cast<BufferInfo*>(cb_arg); BufferInfo* buf = static_cast<BufferInfo*>(cb_arg);

View File

@ -376,7 +376,7 @@ class FilePrefetchBuffer {
} }
// Callback function passed to underlying FS in case of asynchronous reads. // Callback function passed to underlying FS in case of asynchronous reads.
void PrefetchAsyncCallback(const FSReadRequest& req, void* cb_arg); void PrefetchAsyncCallback(FSReadRequest& req, void* cb_arg);
void TEST_GetBufferOffsetandSize( void TEST_GetBufferOffsetandSize(
std::vector<std::pair<uint64_t, size_t>>& buffer_info) { std::vector<std::pair<uint64_t, size_t>>& buffer_info) {

View File

@ -486,7 +486,7 @@ IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
IOStatus RandomAccessFileReader::ReadAsync( IOStatus RandomAccessFileReader::ReadAsync(
FSReadRequest& req, const IOOptions& opts, FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg, std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf) { void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf) {
IOStatus s; IOStatus s;
// Create a callback and populate info. // Create a callback and populate info.
@ -557,7 +557,7 @@ IOStatus RandomAccessFileReader::ReadAsync(
return s; return s;
} }
void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req, void RandomAccessFileReader::ReadAsyncCallback(FSReadRequest& req,
void* cb_arg) { void* cb_arg) {
ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg); ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
assert(read_async_info); assert(read_async_info);

View File

@ -91,8 +91,8 @@ class RandomAccessFileReader {
const bool is_last_level_; const bool is_last_level_;
struct ReadAsyncInfo { struct ReadAsyncInfo {
ReadAsyncInfo(std::function<void(const FSReadRequest&, void*)> cb, ReadAsyncInfo(std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
void* cb_arg, uint64_t start_time) uint64_t start_time)
: cb_(cb), : cb_(cb),
cb_arg_(cb_arg), cb_arg_(cb_arg),
start_time_(start_time), start_time_(start_time),
@ -102,7 +102,7 @@ class RandomAccessFileReader {
user_len_(0), user_len_(0),
is_aligned_(false) {} is_aligned_(false) {}
std::function<void(const FSReadRequest&, void*)> cb_; std::function<void(FSReadRequest&, void*)> cb_;
void* cb_arg_; void* cb_arg_;
uint64_t start_time_; uint64_t start_time_;
FileOperationInfo::StartTimePoint fs_start_ts_; FileOperationInfo::StartTimePoint fs_start_ts_;
@ -188,10 +188,10 @@ class RandomAccessFileReader {
IOStatus PrepareIOOptions(const ReadOptions& ro, IOOptions& opts) const; IOStatus PrepareIOOptions(const ReadOptions& ro, IOOptions& opts) const;
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, std::function<void(FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
AlignedBuf* aligned_buf); AlignedBuf* aligned_buf);
void ReadAsyncCallback(const FSReadRequest& req, void* cb_arg); void ReadAsyncCallback(FSReadRequest& req, void* cb_arg);
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

View File

@ -962,10 +962,10 @@ class FSRandomAccessFile {
// AbortIO API. // AbortIO API.
// //
// Default implementation is to read the data synchronously. // Default implementation is to read the data synchronously.
virtual IOStatus ReadAsync( virtual IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
FSReadRequest& req, const IOOptions& opts, std::function<void(FSReadRequest&, void*)> cb,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg, void* cb_arg, void** /*io_handle*/,
void** /*io_handle*/, IOHandleDeleter* /*del_fn*/, IODebugContext* dbg) { IOHandleDeleter* /*del_fn*/, IODebugContext* dbg) {
req.status = req.status =
Read(req.offset, req.len, opts, &(req.result), req.scratch, dbg); Read(req.offset, req.len, opts, &(req.result), req.scratch, dbg);
cb(req, cb_arg); cb(req, cb_arg);
@ -1687,7 +1687,7 @@ class FSRandomAccessFileWrapper : public FSRandomAccessFile {
return target_->InvalidateCache(offset, length); return target_->InvalidateCache(offset, length);
} }
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, std::function<void(FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override { IODebugContext* dbg) override {
return target()->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, dbg); return target()->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, dbg);

View File

@ -22,14 +22,12 @@ bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) {
for (size_t i = 0; i < awaiter->num_reqs_; ++i) { for (size_t i = 0; i < awaiter->num_reqs_; ++i) {
IOStatus s = awaiter->file_->ReadAsync( IOStatus s = awaiter->file_->ReadAsync(
awaiter->read_reqs_[i], awaiter->opts_, awaiter->read_reqs_[i], awaiter->opts_,
[](const FSReadRequest& req, void* cb_arg) { [](FSReadRequest& req, void* cb_arg) {
FSReadRequest* read_req = static_cast<FSReadRequest*>(cb_arg); FSReadRequest* read_req = static_cast<FSReadRequest*>(cb_arg);
read_req->status = req.status; read_req->status = req.status;
read_req->result = req.result; read_req->result = req.result;
if (req.fs_scratch != nullptr) { if (req.fs_scratch != nullptr) {
// TODO akanksha: Revisit to remove the const in the callback. read_req->fs_scratch = std::move(req.fs_scratch);
FSReadRequest& req_tmp = const_cast<FSReadRequest&>(req);
read_req->fs_scratch = std::move(req_tmp.fs_scratch);
} }
}, },
&awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i], &awaiter->read_reqs_[i], &awaiter->io_handle_[i], &awaiter->del_fn_[i],

View File

@ -416,7 +416,7 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
IOStatus TestFSRandomAccessFile::ReadAsync( IOStatus TestFSRandomAccessFile::ReadAsync(
FSReadRequest& req, const IOOptions& opts, FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg, std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) { void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) {
IOStatus ret; IOStatus ret;
IOStatus s; IOStatus s;

View File

@ -143,7 +143,7 @@ class TestFSRandomAccessFile : public FSRandomAccessFile {
Slice* result, char* scratch, Slice* result, char* scratch,
IODebugContext* dbg) const override; IODebugContext* dbg) const override;
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
std::function<void(const FSReadRequest&, void*)> cb, std::function<void(FSReadRequest&, void*)> cb,
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
IODebugContext* dbg) override; IODebugContext* dbg) override;
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,