mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-27 02:44:18 +00:00
458acf8169
Summary: Some repro unit tests for the bug fixed in https://github.com/facebook/rocksdb/pull/11782. Ran on main without https://github.com/facebook/rocksdb/pull/11782: ``` ./db_compaction_test --gtest_filter='*ErrorWhenReadFileHead' Note: Google Test filter = *ErrorWhenReadFileHead [==========] Running 1 test from 1 test case. [----------] Global test environment set-up. [----------] 1 test from DBCompactionTest [ RUN ] DBCompactionTest.ErrorWhenReadFileHead db/db_compaction_test.cc:10105: Failure Value of: s.IsIOError() Actual: false Expected: true [ FAILED ] DBCompactionTest.ErrorWhenReadFileHead (3960 ms) ./db_iterator_test --gtest_filter="*ErrorWhenReadFile*" Note: Google Test filter = *ErrorWhenReadFile* [==========] Running 1 test from 1 test case. [----------] Global test environment set-up. [----------] 1 test from DBIteratorTest [ RUN ] DBIteratorTest.ErrorWhenReadFile db/db_iterator_test.cc:3399: Failure Value of: (iter->status()).ok() Actual: true Expected: false [ FAILED ] DBIteratorTest.ErrorWhenReadFile (280 ms) [----------] 1 test from DBIteratorTest (280 ms total) ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/11788 Reviewed By: ajkr Differential Revision: D48940284 Pulled By: cbi42 fbshipit-source-id: 06f3c5963f576db3f85d305ffb2745ee13d209bb
639 lines
24 KiB
C++
639 lines
24 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "file/random_access_file_reader.h"
|
|
|
|
#include <algorithm>
|
|
#include <mutex>
|
|
|
|
#include "file/file_util.h"
|
|
#include "monitoring/histogram.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "port/port.h"
|
|
#include "table/format.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/random.h"
|
|
#include "util/rate_limiter_impl.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
inline Histograms GetFileReadHistograms(Statistics* stats,
|
|
Env::IOActivity io_activity) {
|
|
switch (io_activity) {
|
|
case Env::IOActivity::kFlush:
|
|
return Histograms::FILE_READ_FLUSH_MICROS;
|
|
case Env::IOActivity::kCompaction:
|
|
return Histograms::FILE_READ_COMPACTION_MICROS;
|
|
case Env::IOActivity::kDBOpen:
|
|
return Histograms::FILE_READ_DB_OPEN_MICROS;
|
|
default:
|
|
break;
|
|
}
|
|
|
|
if (stats && stats->get_stats_level() > StatsLevel::kExceptDetailedTimers) {
|
|
switch (io_activity) {
|
|
case Env::IOActivity::kGet:
|
|
return Histograms::FILE_READ_GET_MICROS;
|
|
case Env::IOActivity::kMultiGet:
|
|
return Histograms::FILE_READ_MULTIGET_MICROS;
|
|
case Env::IOActivity::kDBIterator:
|
|
return Histograms::FILE_READ_DB_ITERATOR_MICROS;
|
|
case Env::IOActivity::kVerifyDBChecksum:
|
|
return Histograms::FILE_READ_VERIFY_DB_CHECKSUM_MICROS;
|
|
case Env::IOActivity::kVerifyFileChecksums:
|
|
return Histograms::FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
return Histograms::HISTOGRAM_ENUM_MAX;
|
|
}
|
|
inline void RecordIOStats(Statistics* stats, Temperature file_temperature,
|
|
bool is_last_level, size_t size) {
|
|
IOSTATS_ADD(bytes_read, size);
|
|
// record for last/non-last level
|
|
if (is_last_level) {
|
|
RecordTick(stats, LAST_LEVEL_READ_BYTES, size);
|
|
RecordTick(stats, LAST_LEVEL_READ_COUNT, 1);
|
|
} else {
|
|
RecordTick(stats, NON_LAST_LEVEL_READ_BYTES, size);
|
|
RecordTick(stats, NON_LAST_LEVEL_READ_COUNT, 1);
|
|
}
|
|
|
|
// record for temperature file
|
|
if (file_temperature != Temperature::kUnknown) {
|
|
switch (file_temperature) {
|
|
case Temperature::kHot:
|
|
IOSTATS_ADD(file_io_stats_by_temperature.hot_file_bytes_read, size);
|
|
IOSTATS_ADD(file_io_stats_by_temperature.hot_file_read_count, 1);
|
|
RecordTick(stats, HOT_FILE_READ_BYTES, size);
|
|
RecordTick(stats, HOT_FILE_READ_COUNT, 1);
|
|
break;
|
|
case Temperature::kWarm:
|
|
IOSTATS_ADD(file_io_stats_by_temperature.warm_file_bytes_read, size);
|
|
IOSTATS_ADD(file_io_stats_by_temperature.warm_file_read_count, 1);
|
|
RecordTick(stats, WARM_FILE_READ_BYTES, size);
|
|
RecordTick(stats, WARM_FILE_READ_COUNT, 1);
|
|
break;
|
|
case Temperature::kCold:
|
|
IOSTATS_ADD(file_io_stats_by_temperature.cold_file_bytes_read, size);
|
|
IOSTATS_ADD(file_io_stats_by_temperature.cold_file_read_count, 1);
|
|
RecordTick(stats, COLD_FILE_READ_BYTES, size);
|
|
RecordTick(stats, COLD_FILE_READ_COUNT, 1);
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
IOStatus RandomAccessFileReader::Create(
|
|
const std::shared_ptr<FileSystem>& fs, const std::string& fname,
|
|
const FileOptions& file_opts,
|
|
std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) {
|
|
std::unique_ptr<FSRandomAccessFile> file;
|
|
IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
|
|
if (io_s.ok()) {
|
|
reader->reset(new RandomAccessFileReader(std::move(file), fname));
|
|
}
|
|
return io_s;
|
|
}
|
|
|
|
IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
|
|
size_t n, Slice* result, char* scratch,
|
|
AlignedBuf* aligned_buf) const {
|
|
(void)aligned_buf;
|
|
const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
|
|
|
|
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
|
|
|
|
// To be paranoid: modify scratch a little bit, so in case underlying
|
|
// FileSystem doesn't fill the buffer but return success and `scratch` returns
|
|
// contains a previous block, returned value will not pass checksum.
|
|
if (n > 0 && scratch != nullptr) {
|
|
// This byte might not change anything for direct I/O case, but it's OK.
|
|
scratch[0]++;
|
|
}
|
|
|
|
IOStatus io_s;
|
|
uint64_t elapsed = 0;
|
|
size_t alignment = file_->GetRequiredBufferAlignment();
|
|
bool is_aligned = false;
|
|
if (scratch != nullptr) {
|
|
// Check if offset, length and buffer are aligned.
|
|
is_aligned = (offset & (alignment - 1)) == 0 &&
|
|
(n & (alignment - 1)) == 0 &&
|
|
(uintptr_t(scratch) & (alignment - 1)) == 0;
|
|
}
|
|
|
|
{
|
|
StopWatch sw(clock_, stats_, hist_type_,
|
|
GetFileReadHistograms(stats_, opts.io_activity),
|
|
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
|
|
true /*delay_enabled*/);
|
|
auto prev_perf_level = GetPerfLevel();
|
|
IOSTATS_TIMER_GUARD(read_nanos);
|
|
if (use_direct_io() && is_aligned == false) {
|
|
size_t aligned_offset =
|
|
TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
|
|
size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
|
|
size_t read_size =
|
|
Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
|
|
AlignedBuffer buf;
|
|
buf.Alignment(alignment);
|
|
buf.AllocateNewBuffer(read_size);
|
|
while (buf.CurrentSize() < read_size) {
|
|
size_t allowed;
|
|
if (rate_limiter_priority != Env::IO_TOTAL &&
|
|
rate_limiter_ != nullptr) {
|
|
allowed = rate_limiter_->RequestToken(
|
|
buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
|
|
rate_limiter_priority, stats_, RateLimiter::OpType::kRead);
|
|
} else {
|
|
assert(buf.CurrentSize() == 0);
|
|
allowed = read_size;
|
|
}
|
|
Slice tmp;
|
|
|
|
FileOperationInfo::StartTimePoint start_ts;
|
|
uint64_t orig_offset = 0;
|
|
if (ShouldNotifyListeners()) {
|
|
start_ts = FileOperationInfo::StartNow();
|
|
orig_offset = aligned_offset + buf.CurrentSize();
|
|
}
|
|
|
|
{
|
|
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
|
|
// Only user reads are expected to specify a timeout. And user reads
|
|
// are not subjected to rate_limiter and should go through only
|
|
// one iteration of this loop, so we don't need to check and adjust
|
|
// the opts.timeout before calling file_->Read
|
|
assert(!opts.timeout.count() || allowed == read_size);
|
|
io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
|
|
&tmp, buf.Destination(), nullptr);
|
|
}
|
|
if (ShouldNotifyListeners()) {
|
|
auto finish_ts = FileOperationInfo::FinishNow();
|
|
NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
|
|
io_s);
|
|
if (!io_s.ok()) {
|
|
NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
|
|
tmp.size(), orig_offset);
|
|
}
|
|
}
|
|
|
|
buf.Size(buf.CurrentSize() + tmp.size());
|
|
if (!io_s.ok() || tmp.size() < allowed) {
|
|
break;
|
|
}
|
|
}
|
|
size_t res_len = 0;
|
|
if (io_s.ok() && offset_advance < buf.CurrentSize()) {
|
|
res_len = std::min(buf.CurrentSize() - offset_advance, n);
|
|
if (aligned_buf == nullptr) {
|
|
buf.Read(scratch, offset_advance, res_len);
|
|
} else {
|
|
scratch = buf.BufferStart() + offset_advance;
|
|
aligned_buf->reset(buf.Release());
|
|
}
|
|
}
|
|
*result = Slice(scratch, res_len);
|
|
} else {
|
|
size_t pos = 0;
|
|
const char* res_scratch = nullptr;
|
|
while (pos < n) {
|
|
size_t allowed;
|
|
if (rate_limiter_priority != Env::IO_TOTAL &&
|
|
rate_limiter_ != nullptr) {
|
|
if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
|
|
sw.DelayStart();
|
|
}
|
|
allowed = rate_limiter_->RequestToken(
|
|
n - pos, (use_direct_io() ? alignment : 0), rate_limiter_priority,
|
|
stats_, RateLimiter::OpType::kRead);
|
|
if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
|
|
sw.DelayStop();
|
|
}
|
|
} else {
|
|
allowed = n;
|
|
}
|
|
Slice tmp_result;
|
|
|
|
FileOperationInfo::StartTimePoint start_ts;
|
|
if (ShouldNotifyListeners()) {
|
|
start_ts = FileOperationInfo::StartNow();
|
|
}
|
|
|
|
{
|
|
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
|
|
// Only user reads are expected to specify a timeout. And user reads
|
|
// are not subjected to rate_limiter and should go through only
|
|
// one iteration of this loop, so we don't need to check and adjust
|
|
// the opts.timeout before calling file_->Read
|
|
assert(!opts.timeout.count() || allowed == n);
|
|
io_s = file_->Read(offset + pos, allowed, opts, &tmp_result,
|
|
scratch + pos, nullptr);
|
|
}
|
|
if (ShouldNotifyListeners()) {
|
|
auto finish_ts = FileOperationInfo::FinishNow();
|
|
NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
|
|
finish_ts, io_s);
|
|
|
|
if (!io_s.ok()) {
|
|
NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
|
|
tmp_result.size(), offset + pos);
|
|
}
|
|
}
|
|
if (res_scratch == nullptr) {
|
|
// we can't simply use `scratch` because reads of mmap'd files return
|
|
// data in a different buffer.
|
|
res_scratch = tmp_result.data();
|
|
} else {
|
|
// make sure chunks are inserted contiguously into `res_scratch`.
|
|
assert(tmp_result.data() == res_scratch + pos);
|
|
}
|
|
pos += tmp_result.size();
|
|
if (!io_s.ok() || tmp_result.size() < allowed) {
|
|
break;
|
|
}
|
|
}
|
|
*result = Slice(res_scratch, io_s.ok() ? pos : 0);
|
|
}
|
|
RecordIOStats(stats_, file_temperature_, is_last_level_, result->size());
|
|
SetPerfLevel(prev_perf_level);
|
|
}
|
|
if (stats_ != nullptr && file_read_hist_ != nullptr) {
|
|
file_read_hist_->Add(elapsed);
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
auto pair = std::make_pair(&file_name_, &io_s);
|
|
if (offset == 0) {
|
|
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::BeforeReturn",
|
|
&pair);
|
|
}
|
|
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::AnyOffset", &pair);
|
|
#endif
|
|
return io_s;
|
|
}
|
|
|
|
size_t End(const FSReadRequest& r) {
|
|
return static_cast<size_t>(r.offset) + r.len;
|
|
}
|
|
|
|
FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
|
|
FSReadRequest req;
|
|
req.offset = static_cast<uint64_t>(
|
|
TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
|
|
req.len = Roundup(End(r), alignment) - req.offset;
|
|
req.scratch = nullptr;
|
|
return req;
|
|
}
|
|
|
|
bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
|
|
size_t dest_offset = static_cast<size_t>(dest->offset);
|
|
size_t src_offset = static_cast<size_t>(src.offset);
|
|
size_t dest_end = End(*dest);
|
|
size_t src_end = End(src);
|
|
if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) {
|
|
return false;
|
|
}
|
|
dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
|
|
dest->len = std::max(dest_end, src_end) - dest->offset;
|
|
return true;
|
|
}
|
|
|
|
IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
|
|
FSReadRequest* read_reqs,
|
|
size_t num_reqs,
|
|
AlignedBuf* aligned_buf) const {
|
|
(void)aligned_buf; // suppress warning of unused variable in LITE mode
|
|
assert(num_reqs > 0);
|
|
|
|
#ifndef NDEBUG
|
|
for (size_t i = 0; i < num_reqs - 1; ++i) {
|
|
assert(read_reqs[i].offset <= read_reqs[i + 1].offset);
|
|
}
|
|
#endif // !NDEBUG
|
|
const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
|
|
|
|
// To be paranoid modify scratch a little bit, so in case underlying
|
|
// FileSystem doesn't fill the buffer but return success and `scratch` returns
|
|
// contains a previous block, returned value will not pass checksum.
|
|
// This byte might not change anything for direct I/O case, but it's OK.
|
|
for (size_t i = 0; i < num_reqs; i++) {
|
|
FSReadRequest& r = read_reqs[i];
|
|
if (r.len > 0 && r.scratch != nullptr) {
|
|
r.scratch[0]++;
|
|
}
|
|
}
|
|
|
|
IOStatus io_s;
|
|
uint64_t elapsed = 0;
|
|
{
|
|
StopWatch sw(clock_, stats_, hist_type_,
|
|
GetFileReadHistograms(stats_, opts.io_activity),
|
|
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
|
|
true /*delay_enabled*/);
|
|
auto prev_perf_level = GetPerfLevel();
|
|
IOSTATS_TIMER_GUARD(read_nanos);
|
|
|
|
FSReadRequest* fs_reqs = read_reqs;
|
|
size_t num_fs_reqs = num_reqs;
|
|
std::vector<FSReadRequest> aligned_reqs;
|
|
if (use_direct_io()) {
|
|
// num_reqs is the max possible size,
|
|
// this can reduce std::vecector's internal resize operations.
|
|
aligned_reqs.reserve(num_reqs);
|
|
// Align and merge the read requests.
|
|
size_t alignment = file_->GetRequiredBufferAlignment();
|
|
for (size_t i = 0; i < num_reqs; i++) {
|
|
FSReadRequest r = Align(read_reqs[i], alignment);
|
|
if (i == 0) {
|
|
// head
|
|
aligned_reqs.push_back(std::move(r));
|
|
|
|
} else if (!TryMerge(&aligned_reqs.back(), r)) {
|
|
// head + n
|
|
aligned_reqs.push_back(std::move(r));
|
|
|
|
} else {
|
|
// unused
|
|
r.status.PermitUncheckedError();
|
|
}
|
|
}
|
|
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
|
|
&aligned_reqs);
|
|
|
|
// Allocate aligned buffer and let scratch buffers point to it.
|
|
size_t total_len = 0;
|
|
for (const auto& r : aligned_reqs) {
|
|
total_len += r.len;
|
|
}
|
|
AlignedBuffer buf;
|
|
buf.Alignment(alignment);
|
|
buf.AllocateNewBuffer(total_len);
|
|
char* scratch = buf.BufferStart();
|
|
for (auto& r : aligned_reqs) {
|
|
r.scratch = scratch;
|
|
scratch += r.len;
|
|
}
|
|
|
|
aligned_buf->reset(buf.Release());
|
|
fs_reqs = aligned_reqs.data();
|
|
num_fs_reqs = aligned_reqs.size();
|
|
}
|
|
|
|
FileOperationInfo::StartTimePoint start_ts;
|
|
if (ShouldNotifyListeners()) {
|
|
start_ts = FileOperationInfo::StartNow();
|
|
}
|
|
|
|
{
|
|
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
|
|
if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
|
|
// TODO: ideally we should call `RateLimiter::RequestToken()` for
|
|
// allowed bytes to multi-read and then consume those bytes by
|
|
// satisfying as many requests in `MultiRead()` as possible, instead of
|
|
// what we do here, which can cause burst when the
|
|
// `total_multi_read_size` is big.
|
|
size_t total_multi_read_size = 0;
|
|
assert(fs_reqs != nullptr);
|
|
for (size_t i = 0; i < num_fs_reqs; ++i) {
|
|
FSReadRequest& req = fs_reqs[i];
|
|
total_multi_read_size += req.len;
|
|
}
|
|
size_t remaining_bytes = total_multi_read_size;
|
|
size_t request_bytes = 0;
|
|
while (remaining_bytes > 0) {
|
|
request_bytes = std::min(
|
|
static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()),
|
|
remaining_bytes);
|
|
rate_limiter_->Request(request_bytes, rate_limiter_priority,
|
|
nullptr /* stats */,
|
|
RateLimiter::OpType::kRead);
|
|
remaining_bytes -= request_bytes;
|
|
}
|
|
}
|
|
io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
|
|
RecordInHistogram(stats_, MULTIGET_IO_BATCH_SIZE, num_fs_reqs);
|
|
}
|
|
|
|
if (use_direct_io()) {
|
|
// Populate results in the unaligned read requests.
|
|
size_t aligned_i = 0;
|
|
for (size_t i = 0; i < num_reqs; i++) {
|
|
auto& r = read_reqs[i];
|
|
if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
|
|
aligned_i++;
|
|
}
|
|
const auto& fs_r = fs_reqs[aligned_i];
|
|
r.status = fs_r.status;
|
|
if (r.status.ok()) {
|
|
uint64_t offset = r.offset - fs_r.offset;
|
|
if (fs_r.result.size() <= offset) {
|
|
// No byte in the read range is returned.
|
|
r.result = Slice();
|
|
} else {
|
|
size_t len = std::min(
|
|
r.len, static_cast<size_t>(fs_r.result.size() - offset));
|
|
r.result = Slice(fs_r.scratch + offset, len);
|
|
}
|
|
} else {
|
|
r.result = Slice();
|
|
}
|
|
}
|
|
}
|
|
|
|
for (size_t i = 0; i < num_reqs; ++i) {
|
|
if (ShouldNotifyListeners()) {
|
|
auto finish_ts = FileOperationInfo::FinishNow();
|
|
NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
|
|
start_ts, finish_ts, read_reqs[i].status);
|
|
}
|
|
if (!read_reqs[i].status.ok()) {
|
|
NotifyOnIOError(read_reqs[i].status, FileOperationType::kRead,
|
|
file_name(), read_reqs[i].result.size(),
|
|
read_reqs[i].offset);
|
|
}
|
|
|
|
RecordIOStats(stats_, file_temperature_, is_last_level_,
|
|
read_reqs[i].result.size());
|
|
}
|
|
SetPerfLevel(prev_perf_level);
|
|
}
|
|
if (stats_ != nullptr && file_read_hist_ != nullptr) {
|
|
file_read_hist_->Add(elapsed);
|
|
}
|
|
|
|
return io_s;
|
|
}
|
|
|
|
IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
|
|
IOOptions& opts) const {
|
|
if (clock_ != nullptr) {
|
|
return PrepareIOFromReadOptions(ro, clock_, opts);
|
|
} else {
|
|
return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts);
|
|
}
|
|
}
|
|
|
|
IOStatus RandomAccessFileReader::ReadAsync(
|
|
FSReadRequest& req, const IOOptions& opts,
|
|
std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
|
|
void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf) {
|
|
IOStatus s;
|
|
// Create a callback and populate info.
|
|
auto read_async_callback =
|
|
std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
|
|
std::placeholders::_1, std::placeholders::_2);
|
|
ReadAsyncInfo* read_async_info =
|
|
new ReadAsyncInfo(cb, cb_arg, clock_->NowMicros());
|
|
|
|
if (ShouldNotifyListeners()) {
|
|
read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
|
|
}
|
|
|
|
size_t alignment = file_->GetRequiredBufferAlignment();
|
|
bool is_aligned = (req.offset & (alignment - 1)) == 0 &&
|
|
(req.len & (alignment - 1)) == 0 &&
|
|
(uintptr_t(req.scratch) & (alignment - 1)) == 0;
|
|
read_async_info->is_aligned_ = is_aligned;
|
|
|
|
uint64_t elapsed = 0;
|
|
if (use_direct_io() && is_aligned == false) {
|
|
FSReadRequest aligned_req = Align(req, alignment);
|
|
aligned_req.status.PermitUncheckedError();
|
|
|
|
// Allocate aligned buffer.
|
|
read_async_info->buf_.Alignment(alignment);
|
|
read_async_info->buf_.AllocateNewBuffer(aligned_req.len);
|
|
|
|
// Set rem fields in aligned FSReadRequest.
|
|
aligned_req.scratch = read_async_info->buf_.BufferStart();
|
|
|
|
// Set user provided fields to populate back in callback.
|
|
read_async_info->user_scratch_ = req.scratch;
|
|
read_async_info->user_aligned_buf_ = aligned_buf;
|
|
read_async_info->user_len_ = req.len;
|
|
read_async_info->user_offset_ = req.offset;
|
|
read_async_info->user_result_ = req.result;
|
|
|
|
assert(read_async_info->buf_.CurrentSize() == 0);
|
|
|
|
StopWatch sw(clock_, stats_, hist_type_,
|
|
GetFileReadHistograms(stats_, opts.io_activity),
|
|
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
|
|
true /*delay_enabled*/);
|
|
s = file_->ReadAsync(aligned_req, opts, read_async_callback,
|
|
read_async_info, io_handle, del_fn, nullptr /*dbg*/);
|
|
} else {
|
|
StopWatch sw(clock_, stats_, hist_type_,
|
|
GetFileReadHistograms(stats_, opts.io_activity),
|
|
(stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
|
|
true /*delay_enabled*/);
|
|
s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
|
|
io_handle, del_fn, nullptr /*dbg*/);
|
|
}
|
|
RecordTick(stats_, READ_ASYNC_MICROS, elapsed);
|
|
|
|
// Suppress false positive clang analyzer warnings.
|
|
// Memory is not released if file_->ReadAsync returns !s.ok(), because
|
|
// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
|
|
// called then ReadAsync should always return IOStatus::OK().
|
|
#ifndef __clang_analyzer__
|
|
if (!s.ok()) {
|
|
delete read_async_info;
|
|
}
|
|
#endif // __clang_analyzer__
|
|
|
|
return s;
|
|
}
|
|
|
|
void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
|
|
void* cb_arg) {
|
|
ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
|
|
assert(read_async_info);
|
|
assert(read_async_info->cb_);
|
|
|
|
if (use_direct_io() && read_async_info->is_aligned_ == false) {
|
|
// Create FSReadRequest with user provided fields.
|
|
FSReadRequest user_req;
|
|
user_req.scratch = read_async_info->user_scratch_;
|
|
user_req.offset = read_async_info->user_offset_;
|
|
user_req.len = read_async_info->user_len_;
|
|
|
|
// Update results in user_req.
|
|
user_req.result = req.result;
|
|
user_req.status = req.status;
|
|
|
|
read_async_info->buf_.Size(read_async_info->buf_.CurrentSize() +
|
|
req.result.size());
|
|
|
|
size_t offset_advance_len = static_cast<size_t>(
|
|
/*offset_passed_by_user=*/read_async_info->user_offset_ -
|
|
/*aligned_offset=*/req.offset);
|
|
|
|
size_t res_len = 0;
|
|
if (req.status.ok() &&
|
|
offset_advance_len < read_async_info->buf_.CurrentSize()) {
|
|
res_len =
|
|
std::min(read_async_info->buf_.CurrentSize() - offset_advance_len,
|
|
read_async_info->user_len_);
|
|
if (read_async_info->user_aligned_buf_ == nullptr) {
|
|
// Copy the data into user's scratch.
|
|
// Clang analyzer assumes that it will take use_direct_io() == false in
|
|
// ReadAsync and use_direct_io() == true in Callback which cannot be true.
|
|
#ifndef __clang_analyzer__
|
|
read_async_info->buf_.Read(user_req.scratch, offset_advance_len,
|
|
res_len);
|
|
#endif // __clang_analyzer__
|
|
} else {
|
|
// Set aligned_buf provided by user without additional copy.
|
|
user_req.scratch =
|
|
read_async_info->buf_.BufferStart() + offset_advance_len;
|
|
read_async_info->user_aligned_buf_->reset(
|
|
read_async_info->buf_.Release());
|
|
}
|
|
user_req.result = Slice(user_req.scratch, res_len);
|
|
} else {
|
|
// Either req.status is not ok or data was not read.
|
|
user_req.result = Slice();
|
|
}
|
|
read_async_info->cb_(user_req, read_async_info->cb_arg_);
|
|
} else {
|
|
read_async_info->cb_(req, read_async_info->cb_arg_);
|
|
}
|
|
|
|
// Update stats and notify listeners.
|
|
if (stats_ != nullptr && file_read_hist_ != nullptr) {
|
|
// elapsed doesn't take into account delay and overwrite as StopWatch does
|
|
// in Read.
|
|
uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
|
|
file_read_hist_->Add(elapsed);
|
|
}
|
|
if (req.status.ok()) {
|
|
RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
|
|
} else if (!req.status.IsAborted()) {
|
|
RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1);
|
|
}
|
|
if (ShouldNotifyListeners()) {
|
|
auto finish_ts = FileOperationInfo::FinishNow();
|
|
NotifyOnFileReadFinish(req.offset, req.result.size(),
|
|
read_async_info->fs_start_ts_, finish_ts,
|
|
req.status);
|
|
}
|
|
if (!req.status.ok()) {
|
|
NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
|
|
req.result.size(), req.offset);
|
|
}
|
|
RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
|
|
delete read_async_info;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|