mirror of https://github.com/facebook/rocksdb.git
PosixRandomAccessFile::MultiRead() to use I/O uring if supported (#5881)
Summary: Right now, PosixRandomAccessFile::MultiRead() executes read requests in parallel. In this PR, it leverages I/O Uring library to run it in parallel, even when page cache is enabled. This function will fall back if the kernel version doesn't support it. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5881 Test Plan: Run the unit test on a kernel version supporting it and make sure all tests pass, and run a unit test on kernel version supporting it and see it pass. Before merging, will also run stress test and see it passes. Differential Revision: D17742266 fbshipit-source-id: e05699c925ac04fdb42379456a4e23e4ebcb803a
This commit is contained in:
parent
6db57bc37f
commit
e3a82bb934
3
TARGETS
3
TARGETS
|
@ -26,6 +26,7 @@ ROCKSDB_EXTERNAL_DEPS = [
|
|||
("lz4", None, "lz4"),
|
||||
("zstd", None),
|
||||
("tbb", None),
|
||||
("liburing", None, "uring"),
|
||||
("googletest", None, "gtest"),
|
||||
]
|
||||
|
||||
|
@ -46,6 +47,7 @@ ROCKSDB_OS_PREPROCESSOR_FLAGS = [
|
|||
"-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX",
|
||||
"-DROCKSDB_RANGESYNC_PRESENT",
|
||||
"-DROCKSDB_SCHED_GETCPU_PRESENT",
|
||||
"-DROCKSDB_IOURING_PRESENT",
|
||||
"-DHAVE_SSE42",
|
||||
"-DNUMA",
|
||||
],
|
||||
|
@ -70,6 +72,7 @@ ROCKSDB_PREPROCESSOR_FLAGS = [
|
|||
"-DZSTD_STATIC_LINKING_ONLY",
|
||||
"-DGFLAGS=gflags",
|
||||
"-DTBB",
|
||||
"-DLIBURING",
|
||||
|
||||
# Added missing flags from output of build_detect_platform
|
||||
"-DROCKSDB_BACKTRACE",
|
||||
|
|
|
@ -32,6 +32,7 @@ ROCKSDB_EXTERNAL_DEPS = [
|
|||
("lz4", None, "lz4"),
|
||||
("zstd", None),
|
||||
("tbb", None),
|
||||
("liburing", None, "uring"),
|
||||
("googletest", None, "gtest"),
|
||||
]
|
||||
|
||||
|
@ -52,6 +53,7 @@ ROCKSDB_OS_PREPROCESSOR_FLAGS = [
|
|||
"-DROCKSDB_PTHREAD_ADAPTIVE_MUTEX",
|
||||
"-DROCKSDB_RANGESYNC_PRESENT",
|
||||
"-DROCKSDB_SCHED_GETCPU_PRESENT",
|
||||
"-DROCKSDB_IOURING_PRESENT",
|
||||
"-DHAVE_SSE42",
|
||||
"-DNUMA",
|
||||
],
|
||||
|
@ -76,6 +78,7 @@ ROCKSDB_PREPROCESSOR_FLAGS = [
|
|||
"-DZSTD_STATIC_LINKING_ONLY",
|
||||
"-DGFLAGS=gflags",
|
||||
"-DTBB",
|
||||
"-DLIBURING",
|
||||
|
||||
# Added missing flags from output of build_detect_platform
|
||||
"-DROCKSDB_BACKTRACE",
|
||||
|
|
|
@ -150,6 +150,19 @@ case "$TARGET_OS" in
|
|||
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -latomic"
|
||||
fi
|
||||
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lpthread -lrt"
|
||||
# check for liburing
|
||||
$CXX $CFLAGS -x c++ - -luring -o /dev/null 2>/dev/null <<EOF
|
||||
#include <liburing.h>
|
||||
int main() {
|
||||
struct io_uring ring;
|
||||
io_uring_queue_init(1, &ring, 0);
|
||||
return 0;
|
||||
}
|
||||
EOF
|
||||
if [ "$?" = 0 ]; then
|
||||
PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -luring"
|
||||
COMMON_FLAGS="$COMMON_FLAGS -DROCKSDB_IOURING_PRESENT"
|
||||
fi
|
||||
if test -z "$USE_FOLLY_DISTRIBUTED_MUTEX"; then
|
||||
USE_FOLLY_DISTRIBUTED_MUTEX=1
|
||||
fi
|
||||
|
|
|
@ -13,6 +13,7 @@ JEMALLOC_BASE=/mnt/gvfs/third-party2/jemalloc/c26f08f47ac35fc31da2633b7da92d6b86
|
|||
NUMA_BASE=/mnt/gvfs/third-party2/numa/3f3fb57a5ccc5fd21c66416c0b83e0aa76a05376/2.0.11/platform007/ca4da3d
|
||||
LIBUNWIND_BASE=/mnt/gvfs/third-party2/libunwind/40c73d874898b386a71847f1b99115d93822d11f/1.4/platform007/6f3e0a9
|
||||
TBB_BASE=/mnt/gvfs/third-party2/tbb/4ce8e8dba77cdbd81b75d6f0c32fd7a1b76a11ec/2018_U5/platform007/ca4da3d
|
||||
LIBURING_BASE=/mnt/gvfs/third-party2/liburing/79427253fd0d42677255aacfe6d13bfe63f752eb/20190828/platform007/ca4da3d
|
||||
KERNEL_HEADERS_BASE=/mnt/gvfs/third-party2/kernel-headers/fb251ecd2f5ae16f8671f7014c246e52a748fe0b/fb/platform007/da39a3e
|
||||
BINUTILS_BASE=/mnt/gvfs/third-party2/binutils/ab9f09bba370e7066cafd4eb59752db93f2e8312/2.29.1/platform007/15a3614
|
||||
VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/d42d152a15636529b0861ec493927200ebebca8e/3.15.0/platform007/ca4da3d
|
||||
|
|
|
@ -86,6 +86,15 @@ else
|
|||
fi
|
||||
CFLAGS+=" -DTBB"
|
||||
|
||||
# location of LIBURING
|
||||
LIBURING_INCLUDE=" -isystem $LIBURING_BASE/include/"
|
||||
if test -z $PIC_BUILD; then
|
||||
LIBURING_LIBS="$LIBURING_BASE/lib/liburing.a"
|
||||
else
|
||||
LIBURING_LIBS="$LIBURING_BASE/lib/liburing_pic.a"
|
||||
fi
|
||||
CFLAGS+=" -DLIBURING"
|
||||
|
||||
test "$USE_SSE" || USE_SSE=1
|
||||
export USE_SSE
|
||||
test "$PORTABLE" || PORTABLE=1
|
||||
|
@ -94,7 +103,7 @@ export PORTABLE
|
|||
BINUTILS="$BINUTILS_BASE/bin"
|
||||
AR="$BINUTILS/ar"
|
||||
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE"
|
||||
DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE $LIBURING_INCLUDE"
|
||||
|
||||
STDLIBS="-L $GCC_BASE/lib64"
|
||||
|
||||
|
@ -135,10 +144,10 @@ else
|
|||
fi
|
||||
|
||||
CFLAGS+=" $DEPS_INCLUDE"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42"
|
||||
CFLAGS+=" -DROCKSDB_PLATFORM_POSIX -DROCKSDB_LIB_IO_POSIX -DROCKSDB_FALLOCATE_PRESENT -DROCKSDB_MALLOC_USABLE_SIZE -DROCKSDB_RANGESYNC_PRESENT -DROCKSDB_SCHED_GETCPU_PRESENT -DROCKSDB_SUPPORT_THREAD_LOCAL -DHAVE_SSE42 -DROCKSDB_IOURING_PRESENT"
|
||||
CXXFLAGS+=" $CFLAGS"
|
||||
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS"
|
||||
EXEC_LDFLAGS=" $SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $NUMA_LIB $TBB_LIBS $LIBURING_LIBS"
|
||||
EXEC_LDFLAGS+=" -B$BINUTILS/gold"
|
||||
EXEC_LDFLAGS+=" -Wl,--dynamic-linker,/usr/local/fbcode/platform007/lib/ld.so"
|
||||
EXEC_LDFLAGS+=" $LIBUNWIND"
|
||||
|
@ -148,7 +157,7 @@ EXEC_LDFLAGS+=" -ldl"
|
|||
|
||||
PLATFORM_LDFLAGS="$LIBGCC_LIBS $GLIBC_LIBS $STDLIBS -lgcc -lstdc++"
|
||||
|
||||
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS"
|
||||
EXEC_LDFLAGS_SHARED="$SNAPPY_LIBS $ZLIB_LIBS $BZIP_LIBS $LZ4_LIBS $ZSTD_LIBS $GFLAGS_LIBS $TBB_LIBS $LIBURING_LIBS"
|
||||
|
||||
VALGRIND_VER="$VALGRIND_BASE/bin/"
|
||||
|
||||
|
|
|
@ -92,6 +92,7 @@ get_lib_base jemalloc LATEST platform007
|
|||
get_lib_base numa LATEST platform007
|
||||
get_lib_base libunwind LATEST platform007
|
||||
get_lib_base tbb LATEST platform007
|
||||
get_lib_base liburing LATEST platform007
|
||||
|
||||
get_lib_base kernel-headers fb platform007
|
||||
get_lib_base binutils LATEST centos7-native
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
#if defined(OS_LINUX)
|
||||
#include <linux/fs.h>
|
||||
#endif
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
#include <liburing.h>
|
||||
#endif
|
||||
#include <pthread.h>
|
||||
#include <signal.h>
|
||||
#include <stdio.h>
|
||||
|
@ -32,6 +35,9 @@
|
|||
#include <sys/statvfs.h>
|
||||
#include <sys/time.h>
|
||||
#include <sys/types.h>
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
#include <sys/uio.h>
|
||||
#endif
|
||||
#include <time.h>
|
||||
#include <algorithm>
|
||||
// Get nano time includes
|
||||
|
@ -286,7 +292,12 @@ class PosixEnv : public Env {
|
|||
}
|
||||
#endif
|
||||
}
|
||||
result->reset(new PosixRandomAccessFile(fname, fd, options));
|
||||
result->reset(new PosixRandomAccessFile(fname, fd, options
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
,
|
||||
thread_local_io_urings_.get()
|
||||
#endif
|
||||
));
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -1105,6 +1116,11 @@ class PosixEnv : public Env {
|
|||
#endif
|
||||
}
|
||||
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
// io_uring instance
|
||||
std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_;
|
||||
#endif
|
||||
|
||||
size_t page_size_;
|
||||
|
||||
std::vector<ThreadPoolImpl> thread_pools_;
|
||||
|
@ -1129,6 +1145,17 @@ PosixEnv::PosixEnv()
|
|||
thread_pools_[pool_id].SetHostEnv(this);
|
||||
}
|
||||
thread_status_updater_ = CreateThreadStatusUpdater();
|
||||
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
// Test whether IOUring is supported, and if it does, create a managing
|
||||
// object for thread local point so that in the future thread-local
|
||||
// io_uring can be created.
|
||||
struct io_uring* new_io_uring = CreateIOUring();
|
||||
if (new_io_uring != nullptr) {
|
||||
thread_local_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring));
|
||||
delete new_io_uring;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#include "port/port.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "test_util/sync_point.h"
|
||||
#include "util/autovector.h"
|
||||
#include "util/coding.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
|
@ -409,12 +410,22 @@ size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
|
|||
*
|
||||
* pread() based random-access
|
||||
*/
|
||||
PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd,
|
||||
const EnvOptions& options)
|
||||
PosixRandomAccessFile::PosixRandomAccessFile(
|
||||
const std::string& fname, int fd, const EnvOptions& options
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
,
|
||||
ThreadLocalPtr* thread_local_io_urings
|
||||
#endif
|
||||
)
|
||||
: filename_(fname),
|
||||
fd_(fd),
|
||||
use_direct_io_(options.use_direct_reads),
|
||||
logical_sector_size_(GetLogicalBufferSize(fd_)) {
|
||||
logical_sector_size_(GetLogicalBufferSize(fd_))
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
,
|
||||
thread_local_io_urings_(thread_local_io_urings)
|
||||
#endif
|
||||
{
|
||||
assert(!options.use_direct_reads || !options.use_mmap_reads);
|
||||
assert(!options.use_mmap_reads || sizeof(void*) < 8);
|
||||
}
|
||||
|
@ -460,6 +471,96 @@ Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
|
|||
return s;
|
||||
}
|
||||
|
||||
Status PosixRandomAccessFile::MultiRead(ReadRequest* reqs, size_t num_reqs) {
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
size_t reqs_off;
|
||||
ssize_t ret __attribute__((__unused__));
|
||||
|
||||
struct io_uring* iu = nullptr;
|
||||
if (thread_local_io_urings_) {
|
||||
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
|
||||
if (iu == nullptr) {
|
||||
iu = CreateIOUring();
|
||||
if (iu != nullptr) {
|
||||
thread_local_io_urings_->Reset(iu);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Init failed, platform doesn't support io_uring. Fall back to
|
||||
// serialized reads
|
||||
if (iu == nullptr) {
|
||||
return RandomAccessFile::MultiRead(reqs, num_reqs);
|
||||
}
|
||||
|
||||
struct WrappedReadRequest {
|
||||
ReadRequest* req;
|
||||
struct iovec iov;
|
||||
explicit WrappedReadRequest(ReadRequest* r) : req(r) {}
|
||||
};
|
||||
|
||||
autovector<WrappedReadRequest, 32> req_wraps;
|
||||
|
||||
for (size_t i = 0; i < num_reqs; i++) {
|
||||
req_wraps.emplace_back(&reqs[i]);
|
||||
}
|
||||
|
||||
reqs_off = 0;
|
||||
while (num_reqs) {
|
||||
size_t this_reqs = num_reqs;
|
||||
|
||||
// If requests exceed depth, split it into batches
|
||||
if (this_reqs > kIoUringDepth) this_reqs = kIoUringDepth;
|
||||
|
||||
for (size_t i = 0; i < this_reqs; i++) {
|
||||
size_t index = i + reqs_off;
|
||||
struct io_uring_sqe* sqe;
|
||||
|
||||
sqe = io_uring_get_sqe(iu);
|
||||
req_wraps[index].iov.iov_base = reqs[index].scratch;
|
||||
req_wraps[index].iov.iov_len = reqs[index].len;
|
||||
reqs[index].result = reqs[index].scratch;
|
||||
io_uring_prep_readv(sqe, fd_, &req_wraps[index].iov, 1,
|
||||
reqs[index].offset);
|
||||
io_uring_sqe_set_data(sqe, &req_wraps[index]);
|
||||
}
|
||||
|
||||
ret = io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
|
||||
if (static_cast<size_t>(ret) != this_reqs) {
|
||||
fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
|
||||
}
|
||||
assert(static_cast<size_t>(ret) == this_reqs);
|
||||
|
||||
for (size_t i = 0; i < this_reqs; i++) {
|
||||
struct io_uring_cqe* cqe;
|
||||
WrappedReadRequest* req_wrap;
|
||||
|
||||
// We could use the peek variant here, but this seems safer in terms
|
||||
// of our initial wait not reaping all completions
|
||||
ret = io_uring_wait_cqe(iu, &cqe);
|
||||
assert(!ret);
|
||||
req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
|
||||
ReadRequest* req = req_wrap->req;
|
||||
if (static_cast<size_t>(cqe->res) == req_wrap->iov.iov_len) {
|
||||
req->result = Slice(req->scratch, cqe->res);
|
||||
req->status = Status::OK();
|
||||
} else if (cqe->res >= 0) {
|
||||
req->result = Slice(req->scratch, req_wrap->iov.iov_len - cqe->res);
|
||||
} else {
|
||||
req->result = Slice(req->scratch, 0);
|
||||
req->status = IOError("Req failed", filename_, cqe->res);
|
||||
}
|
||||
io_uring_cqe_seen(iu, cqe);
|
||||
}
|
||||
num_reqs -= this_reqs;
|
||||
reqs_off += this_reqs;
|
||||
}
|
||||
return Status::OK();
|
||||
#else
|
||||
return RandomAccessFile::MultiRead(reqs, num_reqs);
|
||||
#endif
|
||||
}
|
||||
|
||||
Status PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n) {
|
||||
Status s;
|
||||
if (!use_direct_io()) {
|
||||
|
|
|
@ -8,10 +8,15 @@
|
|||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#pragma once
|
||||
#include <errno.h>
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
#include <liburing.h>
|
||||
#include <sys/uio.h>
|
||||
#endif
|
||||
#include <unistd.h>
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
#include "rocksdb/env.h"
|
||||
#include "util/thread_local.h"
|
||||
|
||||
// For non linux platform, the following macros are used only as place
|
||||
// holder.
|
||||
|
@ -79,21 +84,51 @@ class PosixSequentialFile : public SequentialFile {
|
|||
}
|
||||
};
|
||||
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
// io_uring instance queue depth
|
||||
const unsigned int kIoUringDepth = 256;
|
||||
|
||||
inline void DeleteIOUring(void* p) {
|
||||
struct io_uring* iu = static_cast<struct io_uring*>(p);
|
||||
delete iu;
|
||||
}
|
||||
|
||||
inline struct io_uring* CreateIOUring() {
|
||||
struct io_uring* new_io_uring = new struct io_uring;
|
||||
int ret = io_uring_queue_init(kIoUringDepth, new_io_uring, 0);
|
||||
if (ret) {
|
||||
delete new_io_uring;
|
||||
new_io_uring = nullptr;
|
||||
}
|
||||
return new_io_uring;
|
||||
}
|
||||
#endif // defined(ROCKSDB_IOURING_PRESENT)
|
||||
|
||||
class PosixRandomAccessFile : public RandomAccessFile {
|
||||
protected:
|
||||
std::string filename_;
|
||||
int fd_;
|
||||
bool use_direct_io_;
|
||||
size_t logical_sector_size_;
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
ThreadLocalPtr* thread_local_io_urings_;
|
||||
#endif
|
||||
|
||||
public:
|
||||
PosixRandomAccessFile(const std::string& fname, int fd,
|
||||
const EnvOptions& options);
|
||||
const EnvOptions& options
|
||||
#if defined(ROCKSDB_IOURING_PRESENT)
|
||||
,
|
||||
ThreadLocalPtr* thread_local_io_urings
|
||||
#endif
|
||||
);
|
||||
virtual ~PosixRandomAccessFile();
|
||||
|
||||
virtual Status Read(uint64_t offset, size_t n, Slice* result,
|
||||
char* scratch) const override;
|
||||
|
||||
virtual Status MultiRead(ReadRequest* reqs, size_t num_reqs) override;
|
||||
|
||||
virtual Status Prefetch(uint64_t offset, size_t n) override;
|
||||
|
||||
#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
|
||||
|
|
Loading…
Reference in New Issue