diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 77539abd2f..cef3391961 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -66,6 +66,17 @@ void FilePrefetchBuffer::CalculateOffsetAndLen(size_t alignment, // chunk_len is greater than 0. bufs_[index].buffer_.RefitTail(static_cast(chunk_offset_in_buffer), static_cast(chunk_len)); + } else if (chunk_len > 0) { + // For async prefetching, it doesn't call RefitTail with chunk_len > 0. + // Allocate new buffer if needed because aligned buffer calculate remaining + // buffer as capacity_ - cursize_ which might not be the case in this as we + // are not refitting. + // TODO akanksha: Update the condition when asynchronous prefetching is + // stable. + bufs_[index].buffer_.Alignment(alignment); + bufs_[index].buffer_.AllocateNewBuffer( + static_cast(roundup_len), copy_data_to_new_buffer, + chunk_offset_in_buffer, static_cast(chunk_len)); } } @@ -236,34 +247,47 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // Index of second buffer. uint32_t second = curr_ ^ 1; + // First clear the buffers if it contains outdated data. Outdated data can be + // because previous sequential reads were read from the cache instead of these + // buffer. + { + if (bufs_[curr_].buffer_.CurrentSize() > 0 && + offset >= bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { + bufs_[curr_].buffer_.Clear(); + } + if (bufs_[second].buffer_.CurrentSize() > 0 && + offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) { + bufs_[second].buffer_.Clear(); + } + } + // If data is in second buffer, make it curr_. Second buffer can be either // partial filled or full. if (bufs_[second].buffer_.CurrentSize() > 0 && offset >= bufs_[second].offset_ && - offset <= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) { + offset < bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) { // Clear the curr_ as buffers have been swapped and curr_ contains the - // outdated data. + // outdated data and switch the buffers. bufs_[curr_].buffer_.Clear(); - // Switch the buffers. curr_ = curr_ ^ 1; second = curr_ ^ 1; } - - // If second buffer contains outdated data, clear it for async prefetching. - // Outdated can be because previous sequential reads were read from the cache - // instead of this buffer. - if (bufs_[second].buffer_.CurrentSize() > 0 && - offset >= bufs_[second].offset_ + bufs_[second].buffer_.CurrentSize()) { - bufs_[second].buffer_.Clear(); + // After swap check if all the requested bytes are in curr_, it will go for + // async prefetching only. + if (bufs_[curr_].buffer_.CurrentSize() > 0 && + offset + length <= + bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize()) { + offset += length; + length = 0; + prefetch_size -= length; } - // Data is overlapping i.e. some of the data is in curr_ buffer and remaining // in second buffer. if (bufs_[curr_].buffer_.CurrentSize() > 0 && bufs_[second].buffer_.CurrentSize() > 0 && offset >= bufs_[curr_].offset_ && offset < bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize() && - offset + prefetch_size > bufs_[second].offset_) { + offset + length > bufs_[second].offset_) { // Allocate new buffer to third buffer; bufs_[2].buffer_.Clear(); bufs_[2].buffer_.Alignment(alignment); @@ -273,12 +297,10 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // Move data from curr_ buffer to third. CopyDataToBuffer(curr_, offset, length); - if (length == 0) { // Requested data has been copied and curr_ still has unconsumed data. return s; } - CopyDataToBuffer(second, offset, length); // Length == 0: All the requested data has been copied to third buffer. It // should go for only async prefetching. @@ -306,6 +328,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, if (length > 0) { CalculateOffsetAndLen(alignment, offset, roundup_len1, curr_, false /*refit_tail*/, chunk_len1); + assert(roundup_len1 >= chunk_len1); read_len1 = static_cast(roundup_len1 - chunk_len1); } { @@ -316,7 +339,7 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, Roundup(rounddown_start2 + readahead_size, alignment); // For length == 0, do the asynchronous prefetching in second instead of - // synchronous prefetching of remaining prefetch_size. + // synchronous prefetching in curr_. if (length == 0) { rounddown_start2 = bufs_[curr_].offset_ + bufs_[curr_].buffer_.CurrentSize(); @@ -330,8 +353,8 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, // Update the buffer offset. bufs_[second].offset_ = rounddown_start2; + assert(roundup_len2 >= chunk_len2); uint64_t read_len2 = static_cast(roundup_len2 - chunk_len2); - ReadAsync(opts, reader, rate_limiter_priority, read_len2, chunk_len2, rounddown_start2, second) .PermitUncheckedError(); @@ -344,7 +367,6 @@ Status FilePrefetchBuffer::PrefetchAsync(const IOOptions& opts, return s; } } - // Copy remaining requested bytes to third_buffer. if (copy_to_third_buffer && length > 0) { CopyDataToBuffer(curr_, offset, length);