mirror of https://github.com/facebook/rocksdb.git
Implement AbortIO using io_uring (#10125)
Summary: Implement AbortIO in posix using io_uring to cancel any pending read requests submitted. Its cancelled using io_uring_prep_cancel which sets the IORING_OP_ASYNC_CANCEL flag. To cancel a request, the sqe must have ->addr set to the user_data of the request it wishes to cancel. If the request is cancelled successfully, the original request is completed with -ECANCELED and the cancel request is completed with a result of 0. If the request was already running, the original may or may not complete in error. The cancel request will complete with -EALREADY for that case. And finally, if the request to cancel wasn't found, the cancel request is completed with -ENOENT. Reference: https://kernel.dk/io_uring-whatsnew.pdf, https://lore.kernel.org/io-uring/d9a8d76d23690842f666c326631ecc2d85b6c1bc.1615566409.git.asml.silence@gmail.com/ Pull Request resolved: https://github.com/facebook/rocksdb/pull/10125 Test Plan: Existing Posix tests. Reviewed By: anand1976 Differential Revision: D36946970 Pulled By: akankshamahajan15 fbshipit-source-id: 3bc1f1521b3151d01a348fc6431eb3fc85db3a14
This commit is contained in:
parent
04bd347995
commit
c0e0f30667
|
@ -31,6 +31,7 @@
|
|||
* Add an extra sanity check in `GetSortedWalFiles()` (also used by `GetLiveFilesStorageInfo()`, `BackupEngine`, and `Checkpoint`) to reduce risk of successfully created backup or checkpoint failing to open because of missing WAL file.
|
||||
* Add a new column family option `blob_file_starting_level` to enable writing blob files during flushes and compactions starting from the specified LSM tree level.
|
||||
* Add support for timestamped snapshots (#9879)
|
||||
* Provide support for AbortIO in posix to cancel submitted asynchronous requests using io_uring.
|
||||
|
||||
### Behavior changes
|
||||
* DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
|
||||
|
|
|
@ -1124,17 +1124,104 @@ class PosixFileSystem : public FileSystem {
|
|||
#endif
|
||||
}
|
||||
|
||||
// TODO akanksha: Look into flags and see how to provide support for AbortIO
|
||||
// in posix for IOUring requests. Currently it calls Poll to wait for requests
|
||||
// to complete the request.
|
||||
virtual IOStatus AbortIO(std::vector<void*>& io_handles) override {
|
||||
IOStatus s = Poll(io_handles, io_handles.size());
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
// io_uring_queue_init.
|
||||
struct io_uring* iu = nullptr;
|
||||
if (thread_local_io_urings_) {
|
||||
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
|
||||
}
|
||||
|
||||
// Init failed, platform doesn't support io_uring.
|
||||
// If Poll is not supported then it didn't submit any request and it should
|
||||
// return OK.
|
||||
if (s.IsNotSupported()) {
|
||||
if (iu == nullptr) {
|
||||
return IOStatus::OK();
|
||||
}
|
||||
return s;
|
||||
|
||||
for (size_t i = 0; i < io_handles.size(); i++) {
|
||||
Posix_IOHandle* posix_handle =
|
||||
static_cast<Posix_IOHandle*>(io_handles[i]);
|
||||
if (posix_handle->is_finished == true) {
|
||||
continue;
|
||||
}
|
||||
assert(posix_handle->iu == iu);
|
||||
if (posix_handle->iu != iu) {
|
||||
return IOStatus::IOError("");
|
||||
}
|
||||
|
||||
// Prepare the cancel request.
|
||||
struct io_uring_sqe* sqe;
|
||||
sqe = io_uring_get_sqe(iu);
|
||||
io_uring_prep_cancel(sqe, posix_handle, 0);
|
||||
io_uring_sqe_set_data(sqe, posix_handle);
|
||||
|
||||
// submit the request.
|
||||
ssize_t ret = io_uring_submit(iu);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "io_uring_submit error: %ld\n", long(ret));
|
||||
return IOStatus::IOError("io_uring_submit() requested but returned " +
|
||||
std::to_string(ret));
|
||||
}
|
||||
}
|
||||
|
||||
// After submitting the requests, wait for the requests.
|
||||
for (size_t i = 0; i < io_handles.size(); i++) {
|
||||
if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
|
||||
continue;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
struct io_uring_cqe* cqe = nullptr;
|
||||
ssize_t ret = io_uring_wait_cqe(iu, &cqe);
|
||||
if (ret) {
|
||||
// abort as it shouldn't be in indeterminate state and there is no
|
||||
// good way currently to handle this error.
|
||||
abort();
|
||||
}
|
||||
assert(cqe != nullptr);
|
||||
|
||||
Posix_IOHandle* posix_handle =
|
||||
static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
|
||||
assert(posix_handle->iu == iu);
|
||||
if (posix_handle->iu != iu) {
|
||||
return IOStatus::IOError("");
|
||||
}
|
||||
posix_handle->req_count++;
|
||||
|
||||
// Reset cqe data to catch any stray reuse of it
|
||||
static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
|
||||
io_uring_cqe_seen(iu, cqe);
|
||||
|
||||
// - If the request is cancelled successfully, the original request is
|
||||
// completed with -ECANCELED and the cancel request is completed with
|
||||
// a result of 0.
|
||||
// - If the request was already running, the original may or
|
||||
// may not complete in error. The cancel request will complete with
|
||||
// -EALREADY for that case.
|
||||
// - And finally, if the request to cancel wasn't
|
||||
// found, the cancel request is completed with -ENOENT.
|
||||
//
|
||||
// Every handle has to wait for 2 requests completion: original one and
|
||||
// the cancel request which is tracked by PosixHandle::req_count.
|
||||
if (posix_handle->req_count == 2 &&
|
||||
static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
|
||||
posix_handle->is_finished = true;
|
||||
FSReadRequest req;
|
||||
req.status = IOStatus::Aborted();
|
||||
posix_handle->cb(req, posix_handle->cb_arg);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return IOStatus::OK();
|
||||
#else
|
||||
// If Poll is not supported then it didn't submit any request and it should
|
||||
// return OK.
|
||||
(void)io_handles;
|
||||
return IOStatus::OK();
|
||||
#endif
|
||||
}
|
||||
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
|
|
|
@ -62,6 +62,8 @@ struct Posix_IOHandle {
|
|||
size_t len;
|
||||
char* scratch;
|
||||
bool is_finished = false;
|
||||
// req_count is used by AbortIO API to keep track of number of requests.
|
||||
uint32_t req_count = 0;
|
||||
};
|
||||
|
||||
inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
|
||||
|
|
|
@ -125,6 +125,13 @@ class IOStatus : public Status {
|
|||
return IOStatus(kIOError, kIOFenced, msg, msg2);
|
||||
}
|
||||
|
||||
static IOStatus Aborted(SubCode msg = kNone) {
|
||||
return IOStatus(kAborted, msg);
|
||||
}
|
||||
static IOStatus Aborted(const Slice& msg, const Slice& msg2 = Slice()) {
|
||||
return IOStatus(kAborted, msg, msg2);
|
||||
}
|
||||
|
||||
// Return a string representation of this status suitable for printing.
|
||||
// Returns the string "OK" for success.
|
||||
// std::string ToString() const;
|
||||
|
|
Loading…
Reference in New Issue