mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-25 22:44:05 +00:00
Add a MultiRead() method to Env (#5311)
Summary: Define the Env:: MultiRead() method to allow callers to request multiple block reads in one shot. The underlying Env implementation can parallelize it if it chooses to in order to reduce the overall IO latency. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5311 Differential Revision: D15502172 Pulled By: anand1976 fbshipit-source-id: 2b228269c2e11b5f54694d6b2bb3119c8a8ce2b9
This commit is contained in:
parent
227b5d52df
commit
0153e14569
53
env/env_test.cc
vendored
53
env/env_test.cc
vendored
|
@ -1105,6 +1105,59 @@ TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDDeletes) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_P(EnvPosixTestWithParam, MultiRead) {
|
||||
EnvOptions soptions;
|
||||
soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
|
||||
std::string fname = test::PerThreadDBPath(env_, "testfile");
|
||||
|
||||
const size_t kSectorSize = 4096;
|
||||
const size_t kNumSectors = 8;
|
||||
|
||||
// Create file.
|
||||
{
|
||||
std::unique_ptr<WritableFile> wfile;
|
||||
#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
|
||||
if (soptions.use_direct_writes) {
|
||||
soptions.use_direct_writes = false;
|
||||
}
|
||||
#endif
|
||||
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
|
||||
for (size_t i = 0; i < kNumSectors; ++i) {
|
||||
auto data = NewAligned(kSectorSize * 8, static_cast<const char>(i + 1));
|
||||
Slice slice(data.get(), kSectorSize);
|
||||
ASSERT_OK(wfile->Append(slice));
|
||||
}
|
||||
ASSERT_OK(wfile->Close());
|
||||
}
|
||||
|
||||
// Random Read
|
||||
{
|
||||
std::unique_ptr<RandomAccessFile> file;
|
||||
std::vector<ReadRequest> reqs(3);
|
||||
std::vector<std::unique_ptr<char, Deleter>> data;
|
||||
uint64_t offset = 0;
|
||||
for (size_t i = 0; i < reqs.size(); ++i) {
|
||||
reqs[i].offset = offset;
|
||||
offset += 2 * kSectorSize;
|
||||
reqs[i].len = kSectorSize;
|
||||
data.emplace_back(NewAligned(kSectorSize, 0));
|
||||
reqs[i].scratch = data.back().get();
|
||||
}
|
||||
#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
|
||||
if (soptions.use_direct_reads) {
|
||||
soptions.use_direct_reads = false;
|
||||
}
|
||||
#endif
|
||||
ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
|
||||
ASSERT_OK(file->MultiRead(reqs.data(), reqs.size()));
|
||||
for (size_t i = 0; i < reqs.size(); ++i) {
|
||||
auto buf = NewAligned(kSectorSize * 8, static_cast<const char>(i*2 + 1));
|
||||
ASSERT_OK(reqs[i].status);
|
||||
ASSERT_EQ(memcmp(reqs[i].scratch, buf.get(), kSectorSize), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Only works in linux platforms
|
||||
#ifdef OS_WIN
|
||||
TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) {
|
||||
|
|
|
@ -583,6 +583,26 @@ class SequentialFile {
|
|||
// SequentialFileWrapper too.
|
||||
};
|
||||
|
||||
// A read IO request structure for use in MultiRead
|
||||
struct ReadRequest {
|
||||
// File offset in bytes
|
||||
uint64_t offset;
|
||||
|
||||
// Length to read in bytes
|
||||
size_t len;
|
||||
|
||||
// A buffer that MultiRead() can optionally place data in. It can
|
||||
// ignore this and allocate its own buffer
|
||||
char* scratch;
|
||||
|
||||
// Output parameter set by MultiRead() to point to the data buffer, and
|
||||
// the number of valid bytes
|
||||
Slice result;
|
||||
|
||||
// Status of read
|
||||
Status status;
|
||||
};
|
||||
|
||||
// A file abstraction for randomly reading the contents of a file.
|
||||
class RandomAccessFile {
|
||||
public:
|
||||
|
@ -607,6 +627,22 @@ class RandomAccessFile {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
// Read a bunch of blocks as described by reqs. The blocks can
|
||||
// optionally be read in parallel. This is a synchronous call, i.e it
|
||||
// should return after all reads have completed. The reads will be
|
||||
// non-overlapping. If the function return Status is not ok, status of
|
||||
// individual requests will be ignored and return status will be assumed
|
||||
// for all read requests. The function return status is only meant for any
|
||||
// any errors that occur before even processing specific read requests
|
||||
virtual Status MultiRead(ReadRequest* reqs, size_t num_reqs) {
|
||||
assert(reqs != nullptr);
|
||||
for (size_t i = 0; i < num_reqs; ++i) {
|
||||
ReadRequest& req = reqs[i];
|
||||
req.status = Read(req.offset, req.len, &req.result, req.scratch);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Tries to get an unique ID for this file that will be the same each time
|
||||
// the file is opened (and will stay the same while the file is open).
|
||||
// Furthermore, it tries to make this ID at most "max_size" bytes. If such an
|
||||
|
@ -1357,6 +1393,9 @@ class RandomAccessFileWrapper : public RandomAccessFile {
|
|||
char* scratch) const override {
|
||||
return target_->Read(offset, n, result, scratch);
|
||||
}
|
||||
Status MultiRead(ReadRequest* reqs, size_t num_reqs) override {
|
||||
return target_->MultiRead(reqs, num_reqs);
|
||||
}
|
||||
Status Prefetch(uint64_t offset, size_t n) override {
|
||||
return target_->Prefetch(offset, n);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue