From b931f84e562a6f484fe0e700bdc5471d395a3924 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 16 Sep 2019 10:31:27 -0700 Subject: [PATCH] Divide file_reader_writer.h and .cc (#5803) Summary: file_reader_writer.h and .cc contain several files and helper function, and it's hard to navigate. Separate it to multiple files and put them under file/ Pull Request resolved: https://github.com/facebook/rocksdb/pull/5803 Test Plan: Build whole project using make and cmake. Differential Revision: D17374550 fbshipit-source-id: 10efca907721e7a78ed25bbf74dc5410dea05987 --- CMakeLists.txt | 7 +- TARGETS | 7 +- db/builder.cc | 3 +- db/compaction/compaction_job.cc | 3 +- db/compaction/compaction_job_test.cc | 2 +- db/db_impl/db_impl.cc | 2 +- db/db_impl/db_impl_open.cc | 2 + db/db_test.cc | 1 - db/external_sst_file_ingestion_job.cc | 2 +- db/flush_job_test.cc | 2 +- db/import_column_family_job.cc | 2 +- db/log_reader.cc | 3 +- db/log_reader.h | 5 +- db/log_test.cc | 3 +- db/log_writer.cc | 2 +- db/repair.cc | 2 +- db/table_cache.cc | 3 +- db/table_properties_collector_test.cc | 6 +- db/transaction_log_impl.cc | 2 +- db/version_set.cc | 4 +- db/wal_manager.cc | 2 +- db/wal_manager_test.cc | 2 +- file/file_prefetch_buffer.cc | 133 ++ file/file_prefetch_buffer.h | 97 ++ file/file_util.cc | 4 +- file/filename.cc | 2 +- file/random_access_file_reader.cc | 188 +++ file/random_access_file_reader.h | 120 ++ file/read_write_util.cc | 61 + file/read_write_util.h | 29 + file/readahead_raf.cc | 170 +++ file/readahead_raf.h | 27 + file/sequence_file_reader.cc | 241 ++++ file/sequence_file_reader.h | 66 + file/writable_file_writer.cc | 405 ++++++ file/writable_file_writer.h | 155 +++ logging/env_logger.h | 2 +- options/options_parser.cc | 3 +- src.mk | 7 +- table/block_based/block_based_table_reader.cc | 4 +- table/block_based/block_based_table_reader.h | 2 +- table/block_fetcher.cc | 1 - table/cuckoo/cuckoo_table_builder.cc | 2 +- table/cuckoo/cuckoo_table_builder_test.cc | 3 +- table/cuckoo/cuckoo_table_reader.h | 2 +- table/format.cc | 2 +- table/format.h | 4 +- table/iterator_wrapper.h | 1 + table/meta_blocks.cc | 2 +- table/mock_table.cc | 2 +- table/plain/plain_table_builder.cc | 2 +- table/plain/plain_table_key_coding.cc | 2 +- table/plain/plain_table_reader.h | 2 +- table/sst_file_reader.cc | 2 +- table/sst_file_writer.cc | 2 +- table/table_builder.h | 2 +- table/table_reader_bench.cc | 2 +- test_util/testutil.cc | 4 +- tools/sst_dump_test.cc | 2 +- tools/sst_dump_tool_imp.h | 2 +- tools/trace_analyzer_test.cc | 1 + tools/trace_analyzer_tool.cc | 3 +- util/file_reader_writer.cc | 1085 ----------------- util/file_reader_writer.h | 407 ------- util/file_reader_writer_test.cc | 5 +- util/log_write_bench.cc | 2 +- utilities/backupable/backupable_db.cc | 3 +- utilities/backupable/backupable_db_test.cc | 1 - utilities/blob_db/blob_db_impl.cc | 3 +- utilities/blob_db/blob_dump_tool.cc | 5 +- utilities/blob_db/blob_dump_tool.h | 2 +- utilities/blob_db/blob_file.cc | 1 + utilities/blob_db/blob_file.h | 2 +- utilities/blob_db/blob_log_reader.cc | 2 +- utilities/blob_db/blob_log_reader.h | 2 +- utilities/blob_db/blob_log_writer.cc | 2 +- .../persistent_cache/block_cache_tier_file.h | 3 +- utilities/simulator_cache/sim_cache.cc | 2 +- utilities/trace/file_trace_reader_writer.cc | 3 +- 79 files changed, 1799 insertions(+), 1559 deletions(-) create mode 100644 file/file_prefetch_buffer.cc create mode 100644 file/file_prefetch_buffer.h create mode 100644 file/random_access_file_reader.cc create mode 100644 file/random_access_file_reader.h create mode 100644 file/read_write_util.cc create mode 100644 file/read_write_util.h create mode 100644 file/readahead_raf.cc create mode 100644 file/readahead_raf.h create mode 100644 file/sequence_file_reader.cc create mode 100644 file/sequence_file_reader.h create mode 100644 file/writable_file_writer.cc create mode 100644 file/writable_file_writer.h delete mode 100644 util/file_reader_writer.cc delete mode 100644 util/file_reader_writer.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 9076af3ab0..e5412ab903 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -547,9 +547,15 @@ set(SOURCES env/env_hdfs.cc env/mock_env.cc file/delete_scheduler.cc + file/file_prefetch_buffer.cc file/file_util.cc file/filename.cc + file/random_access_file_reader.cc + file/read_write_util.cc + file/readahead_raf.cc + file/sequence_file_reader.cc file/sst_file_manager_impl.cc + file/writable_file_writer.cc logging/auto_roll_logger.cc logging/event_logger.cc logging/log_buffer.cc @@ -639,7 +645,6 @@ set(SOURCES util/concurrent_task_limiter_impl.cc util/crc32c.cc util/dynamic_bloom.cc - util/file_reader_writer.cc util/filter_policy.cc util/hash.cc util/murmurhash.cc diff --git a/TARGETS b/TARGETS index b603d027f0..e1af516e0b 100644 --- a/TARGETS +++ b/TARGETS @@ -176,9 +176,15 @@ cpp_library( "env/io_posix.cc", "env/mock_env.cc", "file/delete_scheduler.cc", + "file/file_prefetch_buffer.cc", "file/file_util.cc", "file/filename.cc", + "file/random_access_file_reader.cc", + "file/read_write_util.cc", + "file/readahead_raf.cc", + "file/sequence_file_reader.cc", "file/sst_file_manager_impl.cc", + "file/writable_file_writer.cc", "logging/auto_roll_logger.cc", "logging/event_logger.cc", "logging/log_buffer.cc", @@ -267,7 +273,6 @@ cpp_library( "util/concurrent_task_limiter_impl.cc", "util/crc32c.cc", "util/dynamic_bloom.cc", - "util/file_reader_writer.cc", "util/filter_policy.cc", "util/hash.cc", "util/murmurhash.cc", diff --git a/db/builder.cc b/db/builder.cc index eac1b5fe2e..01ed32e0c6 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -22,6 +22,8 @@ #include "db/table_cache.h" #include "db/version_edit.h" #include "file/filename.h" +#include "file/read_write_util.h" +#include "file/writable_file_writer.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/thread_status_util.h" #include "rocksdb/db.h" @@ -33,7 +35,6 @@ #include "table/format.h" #include "table/internal_iterator.h" #include "test_util/sync_point.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" namespace rocksdb { diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 663c8aa0a8..1acc48b4c4 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -34,7 +34,9 @@ #include "db/range_del_aggregator.h" #include "db/version_set.h" #include "file/filename.h" +#include "file/read_write_util.h" #include "file/sst_file_manager_impl.h" +#include "file/writable_file_writer.h" #include "logging/log_buffer.h" #include "logging/logging.h" #include "monitoring/iostats_context_imp.h" @@ -52,7 +54,6 @@ #include "table/table_builder.h" #include "test_util/sync_point.h" #include "util/coding.h" -#include "util/file_reader_writer.h" #include "util/mutexlock.h" #include "util/random.h" #include "util/stop_watch.h" diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index 8f858e092d..8f9db07362 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -17,6 +17,7 @@ #include "db/db_impl/db_impl.h" #include "db/error_handler.h" #include "db/version_set.h" +#include "file/writable_file_writer.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/options.h" @@ -24,7 +25,6 @@ #include "table/mock_table.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/file_reader_writer.h" #include "util/string_util.h" #include "utilities/merge_operators.h" diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 4bcab93a86..8bc7302504 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -54,6 +54,7 @@ #include "db/write_callback.h" #include "file/file_util.h" #include "file/filename.h" +#include "file/random_access_file_reader.h" #include "file/sst_file_manager_impl.h" #include "logging/auto_roll_logger.h" #include "logging/log_buffer.h" @@ -96,7 +97,6 @@ #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/mutexlock.h" #include "util/stop_watch.h" #include "util/string_util.h" diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 4fed3fea33..5078748d03 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -12,7 +12,9 @@ #include "db/builder.h" #include "db/error_handler.h" +#include "file/read_write_util.h" #include "file/sst_file_manager_impl.h" +#include "file/writable_file_writer.h" #include "monitoring/persistent_stats_history.h" #include "options/options_helper.h" #include "rocksdb/wal_filter.h" diff --git a/db/db_test.cc b/db/db_test.cc index 116403402e..60a077f57f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -62,7 +62,6 @@ #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/compression.h" -#include "util/file_reader_writer.h" #include "util/mutexlock.h" #include "util/rate_limiter.h" #include "util/string_util.h" diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 3926d7fa9f..c11b346a2e 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -16,12 +16,12 @@ #include "db/db_impl/db_impl.h" #include "db/version_edit.h" #include "file/file_util.h" +#include "file/random_access_file_reader.h" #include "table/merging_iterator.h" #include "table/scoped_arena_iterator.h" #include "table/sst_file_writer_collectors.h" #include "table/table_builder.h" #include "test_util/sync_point.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" namespace rocksdb { diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 3725aae956..e4400e8435 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -11,12 +11,12 @@ #include "db/db_impl/db_impl.h" #include "db/flush_job.h" #include "db/version_set.h" +#include "file/writable_file_writer.h" #include "rocksdb/cache.h" #include "rocksdb/write_buffer_manager.h" #include "table/mock_table.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/file_reader_writer.h" #include "util/string_util.h" namespace rocksdb { diff --git a/db/import_column_family_job.cc b/db/import_column_family_job.cc index cd59140698..7a6aa73442 100644 --- a/db/import_column_family_job.cc +++ b/db/import_column_family_job.cc @@ -9,11 +9,11 @@ #include "db/version_edit.h" #include "file/file_util.h" +#include "file/random_access_file_reader.h" #include "table/merging_iterator.h" #include "table/scoped_arena_iterator.h" #include "table/sst_file_writer_collectors.h" #include "table/table_builder.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" namespace rocksdb { diff --git a/db/log_reader.cc b/db/log_reader.cc index e734e9d6c8..3a71cbc429 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -10,10 +10,11 @@ #include "db/log_reader.h" #include +#include "file/sequence_file_reader.h" #include "rocksdb/env.h" +#include "test_util/sync_point.h" #include "util/coding.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/util.h" namespace rocksdb { diff --git a/db/log_reader.h b/db/log_reader.h index efeb270e22..5f9cb981db 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -12,13 +12,12 @@ #include #include "db/log_format.h" +#include "file/sequence_file_reader.h" +#include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" -#include "rocksdb/options.h" namespace rocksdb { - -class SequentialFileReader; class Logger; namespace log { diff --git a/db/log_test.cc b/db/log_test.cc index be7a3cbe7c..ecfae3e2db 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -9,12 +9,13 @@ #include "db/log_reader.h" #include "db/log_writer.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" #include "rocksdb/env.h" #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/coding.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/random.h" namespace rocksdb { diff --git a/db/log_writer.cc b/db/log_writer.cc index c46965e16e..53efc6c15b 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -10,10 +10,10 @@ #include "db/log_writer.h" #include +#include "file/writable_file_writer.h" #include "rocksdb/env.h" #include "util/coding.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" namespace rocksdb { namespace log { diff --git a/db/repair.cc b/db/repair.cc index 0f0d329ccd..3557eb1399 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -71,6 +71,7 @@ #include "db/version_edit.h" #include "db/write_batch_internal.h" #include "file/filename.h" +#include "file/writable_file_writer.h" #include "options/cf_options.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" @@ -78,7 +79,6 @@ #include "rocksdb/options.h" #include "rocksdb/write_buffer_manager.h" #include "table/scoped_arena_iterator.h" -#include "util/file_reader_writer.h" #include "util/string_util.h" namespace rocksdb { diff --git a/db/table_cache.cc b/db/table_cache.cc index bd85fe0d3d..98070be698 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -14,7 +14,7 @@ #include "db/snapshot_impl.h" #include "db/version_edit.h" #include "file/filename.h" - +#include "file/random_access_file_reader.h" #include "monitoring/perf_context_imp.h" #include "rocksdb/statistics.h" #include "table/block_based/block_based_table_reader.h" @@ -27,7 +27,6 @@ #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/coding.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" namespace rocksdb { diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index a9895bbedb..e479fa008b 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -9,10 +9,11 @@ #include #include +#include "db/db_impl/db_impl.h" #include "db/dbformat.h" #include "db/table_properties_collector.h" - -#include "db/db_impl/db_impl.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" #include "options/cf_options.h" #include "rocksdb/table.h" #include "table/block_based/block_based_table_factory.h" @@ -22,7 +23,6 @@ #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/coding.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 55f87ede96..42c724c036 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -8,7 +8,7 @@ #include "db/transaction_log_impl.h" #include #include "db/write_batch_internal.h" -#include "util/file_reader_writer.h" +#include "file/sequence_file_reader.h" namespace rocksdb { diff --git a/db/version_set.cc b/db/version_set.cc index e3c2397cdb..a8ae98550b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -30,6 +30,9 @@ #include "db/table_cache.h" #include "db/version_builder.h" #include "file/filename.h" +#include "file/random_access_file_reader.h" +#include "file/read_write_util.h" +#include "file/writable_file_writer.h" #include "monitoring/file_read_sample.h" #include "monitoring/perf_context_imp.h" #include "monitoring/persistent_stats_history.h" @@ -47,7 +50,6 @@ #include "table/two_level_iterator.h" #include "test_util/sync_point.h" #include "util/coding.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/user_comparator_wrapper.h" diff --git a/db/wal_manager.cc b/db/wal_manager.cc index 6c21ab4a00..783e1c7acd 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -20,6 +20,7 @@ #include "db/write_batch_internal.h" #include "file/file_util.h" #include "file/filename.h" +#include "file/sequence_file_reader.h" #include "logging/logging.h" #include "port/port.h" #include "rocksdb/env.h" @@ -28,7 +29,6 @@ #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/coding.h" -#include "util/file_reader_writer.h" #include "util/mutexlock.h" #include "util/string_util.h" diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 671dc84e1b..089c49cc63 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -18,10 +18,10 @@ #include "db/version_set.h" #include "db/wal_manager.h" #include "env/mock_env.h" +#include "file/writable_file_writer.h" #include "table/mock_table.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/file_reader_writer.h" #include "util/string_util.h" namespace rocksdb { diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc new file mode 100644 index 0000000000..89f32c6ff0 --- /dev/null +++ b/file/file_prefetch_buffer.cc @@ -0,0 +1,133 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "file/file_prefetch_buffer.h" + +#include +#include + +#include "file/random_access_file_reader.h" +#include "monitoring/histogram.h" +#include "monitoring/iostats_context_imp.h" +#include "port/port.h" +#include "test_util/sync_point.h" +#include "util/random.h" +#include "util/rate_limiter.h" + +namespace rocksdb { +Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, + uint64_t offset, size_t n, + bool for_compaction) { + size_t alignment = reader->file()->GetRequiredBufferAlignment(); + size_t offset_ = static_cast(offset); + uint64_t rounddown_offset = Rounddown(offset_, alignment); + uint64_t roundup_end = Roundup(offset_ + n, alignment); + uint64_t roundup_len = roundup_end - rounddown_offset; + assert(roundup_len >= alignment); + assert(roundup_len % alignment == 0); + + // Check if requested bytes are in the existing buffer_. + // If all bytes exist -- return. + // If only a few bytes exist -- reuse them & read only what is really needed. + // This is typically the case of incremental reading of data. + // If no bytes exist in buffer -- full pread. + + Status s; + uint64_t chunk_offset_in_buffer = 0; + uint64_t chunk_len = 0; + bool copy_data_to_new_buffer = false; + if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ && + offset <= buffer_offset_ + buffer_.CurrentSize()) { + if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) { + // All requested bytes are already in the buffer. So no need to Read + // again. + return s; + } else { + // Only a few requested bytes are in the buffer. memmove those chunk of + // bytes to the beginning, and memcpy them back into the new buffer if a + // new buffer is created. + chunk_offset_in_buffer = + Rounddown(static_cast(offset - buffer_offset_), alignment); + chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer; + assert(chunk_offset_in_buffer % alignment == 0); + assert(chunk_len % alignment == 0); + assert(chunk_offset_in_buffer + chunk_len <= + buffer_offset_ + buffer_.CurrentSize()); + if (chunk_len > 0) { + copy_data_to_new_buffer = true; + } else { + // this reset is not necessary, but just to be safe. + chunk_offset_in_buffer = 0; + } + } + } + + // Create a new buffer only if current capacity is not sufficient, and memcopy + // bytes from old buffer if needed (i.e., if chunk_len is greater than 0). + if (buffer_.Capacity() < roundup_len) { + buffer_.Alignment(alignment); + buffer_.AllocateNewBuffer(static_cast(roundup_len), + copy_data_to_new_buffer, chunk_offset_in_buffer, + static_cast(chunk_len)); + } else if (chunk_len > 0) { + // New buffer not needed. But memmove bytes from tail to the beginning since + // chunk_len is greater than 0. + buffer_.RefitTail(static_cast(chunk_offset_in_buffer), + static_cast(chunk_len)); + } + + Slice result; + s = reader->Read(rounddown_offset + chunk_len, + static_cast(roundup_len - chunk_len), &result, + buffer_.BufferStart() + chunk_len, for_compaction); + if (s.ok()) { + buffer_offset_ = rounddown_offset; + buffer_.Size(static_cast(chunk_len) + result.size()); + } + return s; +} + +bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n, + Slice* result, bool for_compaction) { + if (track_min_offset_ && offset < min_offset_read_) { + min_offset_read_ = static_cast(offset); + } + if (!enable_ || offset < buffer_offset_) { + return false; + } + + // If the buffer contains only a few of the requested bytes: + // If readahead is enabled: prefetch the remaining bytes + readadhead bytes + // and satisfy the request. + // If readahead is not enabled: return false. + if (offset + n > buffer_offset_ + buffer_.CurrentSize()) { + if (readahead_size_ > 0) { + assert(file_reader_ != nullptr); + assert(max_readahead_size_ >= readahead_size_); + Status s; + if (for_compaction) { + s = Prefetch(file_reader_, offset, std::max(n, readahead_size_), + for_compaction); + } else { + s = Prefetch(file_reader_, offset, n + readahead_size_, for_compaction); + } + if (!s.ok()) { + return false; + } + readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); + } else { + return false; + } + } + + uint64_t offset_in_buffer = offset - buffer_offset_; + *result = Slice(buffer_.BufferStart() + offset_in_buffer, n); + return true; +} +} // namespace rocksdb diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h new file mode 100644 index 0000000000..c3cacf1020 --- /dev/null +++ b/file/file_prefetch_buffer.h @@ -0,0 +1,97 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include +#include "file/random_access_file_reader.h" +#include "port/port.h" +#include "rocksdb/env.h" +#include "util/aligned_buffer.h" + +namespace rocksdb { + +// FilePrefetchBuffer is a smart buffer to store and read data from a file. +class FilePrefetchBuffer { + public: + // Constructor. + // + // All arguments are optional. + // file_reader : the file reader to use. Can be a nullptr. + // readahead_size : the initial readahead size. + // max_readahead_size : the maximum readahead size. + // If max_readahead_size > readahead_size, the readahead size will be + // doubled on every IO until max_readahead_size is hit. + // Typically this is set as a multiple of readahead_size. + // max_readahead_size should be greater than equal to readahead_size. + // enable : controls whether reading from the buffer is enabled. + // If false, TryReadFromCache() always return false, and we only take stats + // for the minimum offset if track_min_offset = true. + // track_min_offset : Track the minimum offset ever read and collect stats on + // it. Used for adaptable readahead of the file footer/metadata. + // + // Automatic readhead is enabled for a file if file_reader, readahead_size, + // and max_readahead_size are passed in. + // If file_reader is a nullptr, setting readadhead_size and max_readahead_size + // does not make any sense. So it does nothing. + // A user can construct a FilePrefetchBuffer without any arguments, but use + // `Prefetch` to load data into the buffer. + FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr, + size_t readadhead_size = 0, size_t max_readahead_size = 0, + bool enable = true, bool track_min_offset = false) + : buffer_offset_(0), + file_reader_(file_reader), + readahead_size_(readadhead_size), + max_readahead_size_(max_readahead_size), + min_offset_read_(port::kMaxSizet), + enable_(enable), + track_min_offset_(track_min_offset) {} + + // Load data into the buffer from a file. + // reader : the file reader. + // offset : the file offset to start reading from. + // n : the number of bytes to read. + // for_compaction : if prefetch is done for compaction read. + Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n, + bool for_compaction = false); + + // Tries returning the data for a file raed from this buffer, if that data is + // in the buffer. + // It handles tracking the minimum read offset if track_min_offset = true. + // It also does the exponential readahead when readadhead_size is set as part + // of the constructor. + // + // offset : the file offset. + // n : the number of bytes. + // result : output buffer to put the data into. + // for_compaction : if cache read is done for compaction read. + bool TryReadFromCache(uint64_t offset, size_t n, Slice* result, + bool for_compaction = false); + + // The minimum `offset` ever passed to TryReadFromCache(). This will nly be + // tracked if track_min_offset = true. + size_t min_offset_read() const { return min_offset_read_; } + + private: + AlignedBuffer buffer_; + uint64_t buffer_offset_; + RandomAccessFileReader* file_reader_; + size_t readahead_size_; + size_t max_readahead_size_; + // The minimum `offset` ever passed to TryReadFromCache(). + size_t min_offset_read_; + // if false, TryReadFromCache() always return false, and we only take stats + // for track_min_offset_ if track_min_offset_ = true + bool enable_; + // If true, track minimum `offset` ever passed to TryReadFromCache(), which + // can be fetched from min_offset_read(). + bool track_min_offset_; +}; +} // namespace rocksdb diff --git a/file/file_util.cc b/file/file_util.cc index ee52bf640f..f1bf6596ba 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -8,9 +8,11 @@ #include #include +#include "file/random_access_file_reader.h" +#include "file/sequence_file_reader.h" #include "file/sst_file_manager_impl.h" +#include "file/writable_file_writer.h" #include "rocksdb/env.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/file/filename.cc b/file/filename.cc index a6360b5046..5a3fa29022 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -12,10 +12,10 @@ #include #include #include +#include "file/writable_file_writer.h" #include "logging/logging.h" #include "rocksdb/env.h" #include "test_util/sync_point.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" #include "util/string_util.h" diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc new file mode 100644 index 0000000000..5b5a19ff86 --- /dev/null +++ b/file/random_access_file_reader.cc @@ -0,0 +1,188 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "file/random_access_file_reader.h" + +#include +#include + +#include "monitoring/histogram.h" +#include "monitoring/iostats_context_imp.h" +#include "port/port.h" +#include "test_util/sync_point.h" +#include "util/random.h" +#include "util/rate_limiter.h" + +namespace rocksdb { +Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, + char* scratch, bool for_compaction) const { + Status s; + uint64_t elapsed = 0; + { + StopWatch sw(env_, stats_, hist_type_, + (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, + true /*delay_enabled*/); + auto prev_perf_level = GetPerfLevel(); + IOSTATS_TIMER_GUARD(read_nanos); + if (use_direct_io()) { +#ifndef ROCKSDB_LITE + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = + TruncateToPageBoundary(alignment, static_cast(offset)); + size_t offset_advance = static_cast(offset) - aligned_offset; + size_t read_size = + Roundup(static_cast(offset + n), alignment) - aligned_offset; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(read_size); + while (buf.CurrentSize() < read_size) { + size_t allowed; + if (for_compaction && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + buf.Capacity() - buf.CurrentSize(), buf.Alignment(), + Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); + } else { + assert(buf.CurrentSize() == 0); + allowed = read_size; + } + Slice tmp; + + FileOperationInfo::TimePoint start_ts; + uint64_t orig_offset = 0; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + orig_offset = aligned_offset + buf.CurrentSize(); + } + { + IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); + s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp, + buf.Destination()); + } + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, + s); + } + + buf.Size(buf.CurrentSize() + tmp.size()); + if (!s.ok() || tmp.size() < allowed) { + break; + } + } + size_t res_len = 0; + if (s.ok() && offset_advance < buf.CurrentSize()) { + res_len = buf.Read(scratch, offset_advance, + std::min(buf.CurrentSize() - offset_advance, n)); + } + *result = Slice(scratch, res_len); +#endif // !ROCKSDB_LITE + } else { + size_t pos = 0; + const char* res_scratch = nullptr; + while (pos < n) { + size_t allowed; + if (for_compaction && rate_limiter_ != nullptr) { + if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { + sw.DelayStart(); + } + allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, + Env::IOPriority::IO_LOW, stats_, + RateLimiter::OpType::kRead); + if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { + sw.DelayStop(); + } + } else { + allowed = n; + } + Slice tmp_result; + +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif + { + IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); + s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, + finish_ts, s); + } +#endif + + if (res_scratch == nullptr) { + // we can't simply use `scratch` because reads of mmap'd files return + // data in a different buffer. + res_scratch = tmp_result.data(); + } else { + // make sure chunks are inserted contiguously into `res_scratch`. + assert(tmp_result.data() == res_scratch + pos); + } + pos += tmp_result.size(); + if (!s.ok() || tmp_result.size() < allowed) { + break; + } + } + *result = Slice(res_scratch, s.ok() ? pos : 0); + } + IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); + SetPerfLevel(prev_perf_level); + } + if (stats_ != nullptr && file_read_hist_ != nullptr) { + file_read_hist_->Add(elapsed); + } + + return s; +} + +Status RandomAccessFileReader::MultiRead(ReadRequest* read_reqs, + size_t num_reqs) const { + Status s; + uint64_t elapsed = 0; + assert(!use_direct_io()); + { + StopWatch sw(env_, stats_, hist_type_, + (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, + true /*delay_enabled*/); + auto prev_perf_level = GetPerfLevel(); + IOSTATS_TIMER_GUARD(read_nanos); + +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } +#endif // ROCKSDB_LITE + { + IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); + s = file_->MultiRead(read_reqs, num_reqs); + } + for (size_t i = 0; i < num_reqs; ++i) { +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(), + start_ts, finish_ts, read_reqs[i].status); + } +#endif // ROCKSDB_LITE + IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size()); + } + SetPerfLevel(prev_perf_level); + } + if (stats_ != nullptr && file_read_hist_ != nullptr) { + file_read_hist_->Add(elapsed); + } + + return s; +} +} // namespace rocksdb diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h new file mode 100644 index 0000000000..abbc71ff11 --- /dev/null +++ b/file/random_access_file_reader.h @@ -0,0 +1,120 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/listener.h" +#include "rocksdb/rate_limiter.h" +#include "util/aligned_buffer.h" + +namespace rocksdb { + +class Statistics; +class HistogramImpl; + +// RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is +// responsible for: +// - Handling Buffered and Direct reads appropriately. +// - Rate limiting compaction reads. +// - Notifying any interested listeners on the completion of a read. +// - Updating IO stats. +class RandomAccessFileReader { + private: +#ifndef ROCKSDB_LITE + void NotifyOnFileReadFinish(uint64_t offset, size_t length, + const FileOperationInfo::TimePoint& start_ts, + const FileOperationInfo::TimePoint& finish_ts, + const Status& status) const { + FileOperationInfo info(file_name_, start_ts, finish_ts); + info.offset = offset; + info.length = length; + info.status = status; + + for (auto& listener : listeners_) { + listener->OnFileReadFinish(info); + } + } +#endif // ROCKSDB_LITE + + bool ShouldNotifyListeners() const { return !listeners_.empty(); } + + std::unique_ptr file_; + std::string file_name_; + Env* env_; + Statistics* stats_; + uint32_t hist_type_; + HistogramImpl* file_read_hist_; + RateLimiter* rate_limiter_; + std::vector> listeners_; + + public: + explicit RandomAccessFileReader( + std::unique_ptr&& raf, std::string _file_name, + Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0, + HistogramImpl* file_read_hist = nullptr, + RateLimiter* rate_limiter = nullptr, + const std::vector>& listeners = {}) + : file_(std::move(raf)), + file_name_(std::move(_file_name)), + env_(env), + stats_(stats), + hist_type_(hist_type), + file_read_hist_(file_read_hist), + rate_limiter_(rate_limiter), + listeners_() { +#ifndef ROCKSDB_LITE + std::for_each(listeners.begin(), listeners.end(), + [this](const std::shared_ptr& e) { + if (e->ShouldBeNotifiedOnFileIO()) { + listeners_.emplace_back(e); + } + }); +#else // !ROCKSDB_LITE + (void)listeners; +#endif + } + + RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + RandomAccessFileReader& operator=(RandomAccessFileReader&& o) + ROCKSDB_NOEXCEPT { + file_ = std::move(o.file_); + env_ = std::move(o.env_); + stats_ = std::move(o.stats_); + hist_type_ = std::move(o.hist_type_); + file_read_hist_ = std::move(o.file_read_hist_); + rate_limiter_ = std::move(o.rate_limiter_); + return *this; + } + + RandomAccessFileReader(const RandomAccessFileReader&) = delete; + RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; + + Status Read(uint64_t offset, size_t n, Slice* result, char* scratch, + bool for_compaction = false) const; + + Status MultiRead(ReadRequest* reqs, size_t num_reqs) const; + + Status Prefetch(uint64_t offset, size_t n) const { + return file_->Prefetch(offset, n); + } + + RandomAccessFile* file() { return file_.get(); } + + std::string file_name() const { return file_name_; } + + bool use_direct_io() const { return file_->use_direct_io(); } +}; +} // namespace rocksdb diff --git a/file/read_write_util.cc b/file/read_write_util.cc new file mode 100644 index 0000000000..8614fcaa8d --- /dev/null +++ b/file/read_write_util.cc @@ -0,0 +1,61 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "file/read_write_util.h" + +#include +#include "test_util/sync_point.h" + +namespace rocksdb { +Status NewWritableFile(Env* env, const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) { + Status s = env->NewWritableFile(fname, result, options); + TEST_KILL_RANDOM("NewWritableFile:0", rocksdb_kill_odds * REDUCE_ODDS2); + return s; +} + +bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file, + std::string* output, bool* has_data, Status* result) { + const int kBufferSize = 8192; + char buffer[kBufferSize + 1]; + Slice input_slice; + + std::string line; + bool has_complete_line = false; + while (!has_complete_line) { + if (std::getline(*iss, line)) { + has_complete_line = !iss->eof(); + } else { + has_complete_line = false; + } + if (!has_complete_line) { + // if we're not sure whether we have a complete line, + // further read from the file. + if (*has_data) { + *result = seq_file->Read(kBufferSize, &input_slice, buffer); + } + if (input_slice.size() == 0) { + // meaning we have read all the data + *has_data = false; + break; + } else { + iss->str(line + input_slice.ToString()); + // reset the internal state of iss so that we can keep reading it. + iss->clear(); + *has_data = (input_slice.size() == kBufferSize); + continue; + } + } + } + *output = line; + return *has_data || has_complete_line; +} + +} // namespace rocksdb diff --git a/file/read_write_util.h b/file/read_write_util.h new file mode 100644 index 0000000000..7c344728fd --- /dev/null +++ b/file/read_write_util.h @@ -0,0 +1,29 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include "rocksdb/env.h" + +namespace rocksdb { +// Returns a WritableFile. +// +// env : the Env. +// fname : the file name. +// result : output arg. A WritableFile based on `fname` returned. +// options : the Env Options. +extern Status NewWritableFile(Env* env, const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options); + +// Read a single line from a file. +bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file, + std::string* output, bool* has_data, Status* result); + +} // namespace rocksdb diff --git a/file/readahead_raf.cc b/file/readahead_raf.cc new file mode 100644 index 0000000000..5c5582d141 --- /dev/null +++ b/file/readahead_raf.cc @@ -0,0 +1,170 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "file/readahead_raf.h" + +#include +#include +#include "util/aligned_buffer.h" +#include "util/rate_limiter.h" + +namespace rocksdb { + +#ifndef NDEBUG +namespace { +bool IsFileSectorAligned(const size_t off, size_t sector_size) { + return off % sector_size == 0; +} +} // namespace +#endif + +namespace { +class ReadaheadRandomAccessFile : public RandomAccessFile { + public: + ReadaheadRandomAccessFile(std::unique_ptr&& file, + size_t readahead_size) + : file_(std::move(file)), + alignment_(file_->GetRequiredBufferAlignment()), + readahead_size_(Roundup(readahead_size, alignment_)), + buffer_(), + buffer_offset_(0) { + buffer_.Alignment(alignment_); + buffer_.AllocateNewBuffer(readahead_size_); + } + + ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; + + ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = + delete; + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + // Read-ahead only make sense if we have some slack left after reading + if (n + alignment_ >= readahead_size_) { + return file_->Read(offset, n, result, scratch); + } + + std::unique_lock lk(lock_); + + size_t cached_len = 0; + // Check if there is a cache hit, meaning that [offset, offset + n) is + // either completely or partially in the buffer. If it's completely cached, + // including end of file case when offset + n is greater than EOF, then + // return. + if (TryReadFromCache(offset, n, &cached_len, scratch) && + (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { + // We read exactly what we needed, or we hit end of file - return. + *result = Slice(scratch, cached_len); + return Status::OK(); + } + size_t advanced_offset = static_cast(offset + cached_len); + // In the case of cache hit advanced_offset is already aligned, means that + // chunk_offset equals to advanced_offset + size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset); + + Status s = ReadIntoBuffer(chunk_offset, readahead_size_); + if (s.ok()) { + // The data we need is now in cache, so we can safely read it + size_t remaining_len; + TryReadFromCache(advanced_offset, n - cached_len, &remaining_len, + scratch + cached_len); + *result = Slice(scratch, cached_len + remaining_len); + } + return s; + } + + Status Prefetch(uint64_t offset, size_t n) override { + if (n < readahead_size_) { + // Don't allow smaller prefetches than the configured `readahead_size_`. + // `Read()` assumes a smaller prefetch buffer indicates EOF was reached. + return Status::OK(); + } + + std::unique_lock lk(lock_); + + size_t offset_ = static_cast(offset); + size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_); + if (prefetch_offset == buffer_offset_) { + return Status::OK(); + } + return ReadIntoBuffer(prefetch_offset, + Roundup(offset_ + n, alignment_) - prefetch_offset); + } + + size_t GetUniqueId(char* id, size_t max_size) const override { + return file_->GetUniqueId(id, max_size); + } + + void Hint(AccessPattern pattern) override { file_->Hint(pattern); } + + Status InvalidateCache(size_t offset, size_t length) override { + std::unique_lock lk(lock_); + buffer_.Clear(); + return file_->InvalidateCache(offset, length); + } + + bool use_direct_io() const override { return file_->use_direct_io(); } + + private: + // Tries to read from buffer_ n bytes starting at offset. If anything was read + // from the cache, it sets cached_len to the number of bytes actually read, + // copies these number of bytes to scratch and returns true. + // If nothing was read sets cached_len to 0 and returns false. + bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len, + char* scratch) const { + if (offset < buffer_offset_ || + offset >= buffer_offset_ + buffer_.CurrentSize()) { + *cached_len = 0; + return false; + } + uint64_t offset_in_buffer = offset - buffer_offset_; + *cached_len = std::min( + buffer_.CurrentSize() - static_cast(offset_in_buffer), n); + memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); + return true; + } + + // Reads into buffer_ the next n bytes from file_ starting at offset. + // Can actually read less if EOF was reached. + // Returns the status of the read operastion on the file. + Status ReadIntoBuffer(uint64_t offset, size_t n) const { + if (n > buffer_.Capacity()) { + n = buffer_.Capacity(); + } + assert(IsFileSectorAligned(offset, alignment_)); + assert(IsFileSectorAligned(n, alignment_)); + Slice result; + Status s = file_->Read(offset, n, &result, buffer_.BufferStart()); + if (s.ok()) { + buffer_offset_ = offset; + buffer_.Size(result.size()); + assert(result.size() == 0 || buffer_.BufferStart() == result.data()); + } + return s; + } + + const std::unique_ptr file_; + const size_t alignment_; + const size_t readahead_size_; + + mutable std::mutex lock_; + // The buffer storing the prefetched data + mutable AlignedBuffer buffer_; + // The offset in file_, corresponding to data stored in buffer_ + mutable uint64_t buffer_offset_; +}; +} // namespace + +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr&& file, size_t readahead_size) { + std::unique_ptr result( + new ReadaheadRandomAccessFile(std::move(file), readahead_size)); + return result; +} +} // namespace rocksdb diff --git a/file/readahead_raf.h b/file/readahead_raf.h new file mode 100644 index 0000000000..f6d64e77ac --- /dev/null +++ b/file/readahead_raf.h @@ -0,0 +1,27 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include "rocksdb/env.h" + +namespace rocksdb { +// This file provides the following main abstractions: +// SequentialFileReader : wrapper over Env::SequentialFile +// RandomAccessFileReader : wrapper over Env::RandomAccessFile +// WritableFileWriter : wrapper over Env::WritableFile +// In addition, it also exposed NewReadaheadRandomAccessFile, NewWritableFile, +// and ReadOneLine primitives. + +// NewReadaheadRandomAccessFile provides a wrapper over RandomAccessFile to +// always prefetch additional data with every read. This is mainly used in +// Compaction Table Readers. +std::unique_ptr NewReadaheadRandomAccessFile( + std::unique_ptr&& file, size_t readahead_size); +} // namespace rocksdb diff --git a/file/sequence_file_reader.cc b/file/sequence_file_reader.cc new file mode 100644 index 0000000000..f9b20d067d --- /dev/null +++ b/file/sequence_file_reader.cc @@ -0,0 +1,241 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "file/sequence_file_reader.h" + +#include +#include + +#include "monitoring/histogram.h" +#include "monitoring/iostats_context_imp.h" +#include "port/port.h" +#include "test_util/sync_point.h" +#include "util/aligned_buffer.h" +#include "util/random.h" +#include "util/rate_limiter.h" + +namespace rocksdb { + +#ifndef NDEBUG +namespace { +bool IsFileSectorAligned(const size_t off, size_t sector_size) { + return off % sector_size == 0; +} +} // namespace +#endif + +Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { + Status s; + if (use_direct_io()) { +#ifndef ROCKSDB_LITE + size_t offset = offset_.fetch_add(n); + size_t alignment = file_->GetRequiredBufferAlignment(); + size_t aligned_offset = TruncateToPageBoundary(alignment, offset); + size_t offset_advance = offset - aligned_offset; + size_t size = Roundup(offset + n, alignment) - aligned_offset; + size_t r = 0; + AlignedBuffer buf; + buf.Alignment(alignment); + buf.AllocateNewBuffer(size); + Slice tmp; + s = file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart()); + if (s.ok() && offset_advance < tmp.size()) { + buf.Size(tmp.size()); + r = buf.Read(scratch, offset_advance, + std::min(tmp.size() - offset_advance, n)); + } + *result = Slice(scratch, r); +#endif // !ROCKSDB_LITE + } else { + s = file_->Read(n, result, scratch); + } + IOSTATS_ADD(bytes_read, result->size()); + return s; +} + +Status SequentialFileReader::Skip(uint64_t n) { +#ifndef ROCKSDB_LITE + if (use_direct_io()) { + offset_ += static_cast(n); + return Status::OK(); + } +#endif // !ROCKSDB_LITE + return file_->Skip(n); +} + +namespace { +// This class wraps a SequentialFile, exposing same API, with the differenece +// of being able to prefetch up to readahead_size bytes and then serve them +// from memory, avoiding the entire round-trip if, for example, the data for the +// file is actually remote. +class ReadaheadSequentialFile : public SequentialFile { + public: + ReadaheadSequentialFile(std::unique_ptr&& file, + size_t readahead_size) + : file_(std::move(file)), + alignment_(file_->GetRequiredBufferAlignment()), + readahead_size_(Roundup(readahead_size, alignment_)), + buffer_(), + buffer_offset_(0), + read_offset_(0) { + buffer_.Alignment(alignment_); + buffer_.AllocateNewBuffer(readahead_size_); + } + + ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete; + + ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete; + + Status Read(size_t n, Slice* result, char* scratch) override { + std::unique_lock lk(lock_); + + size_t cached_len = 0; + // Check if there is a cache hit, meaning that [offset, offset + n) is + // either completely or partially in the buffer. If it's completely cached, + // including end of file case when offset + n is greater than EOF, then + // return. + if (TryReadFromCache(n, &cached_len, scratch) && + (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { + // We read exactly what we needed, or we hit end of file - return. + *result = Slice(scratch, cached_len); + return Status::OK(); + } + n -= cached_len; + + Status s; + // Read-ahead only make sense if we have some slack left after reading + if (n + alignment_ >= readahead_size_) { + s = file_->Read(n, result, scratch + cached_len); + if (s.ok()) { + read_offset_ += result->size(); + *result = Slice(scratch, cached_len + result->size()); + } + buffer_.Clear(); + return s; + } + + s = ReadIntoBuffer(readahead_size_); + if (s.ok()) { + // The data we need is now in cache, so we can safely read it + size_t remaining_len; + TryReadFromCache(n, &remaining_len, scratch + cached_len); + *result = Slice(scratch, cached_len + remaining_len); + } + return s; + } + + Status Skip(uint64_t n) override { + std::unique_lock lk(lock_); + Status s = Status::OK(); + // First check if we need to skip already cached data + if (buffer_.CurrentSize() > 0) { + // Do we need to skip beyond cached data? + if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) { + // Yes. Skip whaterver is in memory and adjust offset accordingly + n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_; + read_offset_ = buffer_offset_ + buffer_.CurrentSize(); + } else { + // No. The entire section to be skipped is entirely i cache. + read_offset_ += n; + n = 0; + } + } + if (n > 0) { + // We still need to skip more, so call the file API for skipping + s = file_->Skip(n); + if (s.ok()) { + read_offset_ += n; + } + buffer_.Clear(); + } + return s; + } + + Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override { + return file_->PositionedRead(offset, n, result, scratch); + } + + Status InvalidateCache(size_t offset, size_t length) override { + std::unique_lock lk(lock_); + buffer_.Clear(); + return file_->InvalidateCache(offset, length); + } + + bool use_direct_io() const override { return file_->use_direct_io(); } + + private: + // Tries to read from buffer_ n bytes. If anything was read from the cache, it + // sets cached_len to the number of bytes actually read, copies these number + // of bytes to scratch and returns true. + // If nothing was read sets cached_len to 0 and returns false. + bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) { + if (read_offset_ < buffer_offset_ || + read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) { + *cached_len = 0; + return false; + } + uint64_t offset_in_buffer = read_offset_ - buffer_offset_; + *cached_len = std::min( + buffer_.CurrentSize() - static_cast(offset_in_buffer), n); + memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); + read_offset_ += *cached_len; + return true; + } + + // Reads into buffer_ the next n bytes from file_. + // Can actually read less if EOF was reached. + // Returns the status of the read operastion on the file. + Status ReadIntoBuffer(size_t n) { + if (n > buffer_.Capacity()) { + n = buffer_.Capacity(); + } + assert(IsFileSectorAligned(n, alignment_)); + Slice result; + Status s = file_->Read(n, &result, buffer_.BufferStart()); + if (s.ok()) { + buffer_offset_ = read_offset_; + buffer_.Size(result.size()); + assert(result.size() == 0 || buffer_.BufferStart() == result.data()); + } + return s; + } + + const std::unique_ptr file_; + const size_t alignment_; + const size_t readahead_size_; + + std::mutex lock_; + // The buffer storing the prefetched data + AlignedBuffer buffer_; + // The offset in file_, corresponding to data stored in buffer_ + uint64_t buffer_offset_; + // The offset up to which data was read from file_. In fact, it can be larger + // than the actual file size, since the file_->Skip(n) call doesn't return the + // actual number of bytes that were skipped, which can be less than n. + // This is not a problemm since read_offset_ is monotonically increasing and + // its only use is to figure out if next piece of data should be read from + // buffer_ or file_ directly. + uint64_t read_offset_; +}; +} // namespace + +std::unique_ptr +SequentialFileReader::NewReadaheadSequentialFile( + std::unique_ptr&& file, size_t readahead_size) { + if (file->GetRequiredBufferAlignment() >= readahead_size) { + // Short-circuit and return the original file if readahead_size is + // too small and hence doesn't make sense to be used for prefetching. + return std::move(file); + } + std::unique_ptr result( + new ReadaheadSequentialFile(std::move(file), readahead_size)); + return result; +} +} // namespace rocksdb diff --git a/file/sequence_file_reader.h b/file/sequence_file_reader.h new file mode 100644 index 0000000000..6a6350e1d6 --- /dev/null +++ b/file/sequence_file_reader.h @@ -0,0 +1,66 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include "port/port.h" +#include "rocksdb/env.h" + +namespace rocksdb { + +// SequentialFileReader is a wrapper on top of Env::SequentialFile. It handles +// Buffered (i.e when page cache is enabled) and Direct (with O_DIRECT / page +// cache disabled) reads appropriately, and also updates the IO stats. +class SequentialFileReader { + private: + std::unique_ptr file_; + std::string file_name_; + std::atomic offset_{0}; // read offset + + public: + explicit SequentialFileReader(std::unique_ptr&& _file, + const std::string& _file_name) + : file_(std::move(_file)), file_name_(_file_name) {} + + explicit SequentialFileReader(std::unique_ptr&& _file, + const std::string& _file_name, + size_t _readahead_size) + : file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size)), + file_name_(_file_name) {} + + SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { + *this = std::move(o); + } + + SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { + file_ = std::move(o.file_); + return *this; + } + + SequentialFileReader(const SequentialFileReader&) = delete; + SequentialFileReader& operator=(const SequentialFileReader&) = delete; + + Status Read(size_t n, Slice* result, char* scratch); + + Status Skip(uint64_t n); + + SequentialFile* file() { return file_.get(); } + + std::string file_name() { return file_name_; } + + bool use_direct_io() const { return file_->use_direct_io(); } + + private: + // NewReadaheadSequentialFile provides a wrapper over SequentialFile to + // always prefetch additional data with every read. + static std::unique_ptr NewReadaheadSequentialFile( + std::unique_ptr&& file, size_t readahead_size); +}; +} // namespace rocksdb diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc new file mode 100644 index 0000000000..277e55500c --- /dev/null +++ b/file/writable_file_writer.cc @@ -0,0 +1,405 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "file/writable_file_writer.h" + +#include +#include + +#include "monitoring/histogram.h" +#include "monitoring/iostats_context_imp.h" +#include "port/port.h" +#include "test_util/sync_point.h" +#include "util/random.h" +#include "util/rate_limiter.h" + +namespace rocksdb { +Status WritableFileWriter::Append(const Slice& data) { + const char* src = data.data(); + size_t left = data.size(); + Status s; + pending_sync_ = true; + + TEST_KILL_RANDOM("WritableFileWriter::Append:0", + rocksdb_kill_odds * REDUCE_ODDS2); + + { + IOSTATS_TIMER_GUARD(prepare_write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); + writable_file_->PrepareWrite(static_cast(GetFileSize()), left); + } + + // See whether we need to enlarge the buffer to avoid the flush + if (buf_.Capacity() - buf_.CurrentSize() < left) { + for (size_t cap = buf_.Capacity(); + cap < max_buffer_size_; // There is still room to increase + cap *= 2) { + // See whether the next available size is large enough. + // Buffer will never be increased to more than max_buffer_size_. + size_t desired_capacity = std::min(cap * 2, max_buffer_size_); + if (desired_capacity - buf_.CurrentSize() >= left || + (use_direct_io() && desired_capacity == max_buffer_size_)) { + buf_.AllocateNewBuffer(desired_capacity, true); + break; + } + } + } + + // Flush only when buffered I/O + if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { + if (buf_.CurrentSize() > 0) { + s = Flush(); + if (!s.ok()) { + return s; + } + } + assert(buf_.CurrentSize() == 0); + } + + // We never write directly to disk with direct I/O on. + // or we simply use it for its original purpose to accumulate many small + // chunks + if (use_direct_io() || (buf_.Capacity() >= left)) { + while (left > 0) { + size_t appended = buf_.Append(src, left); + left -= appended; + src += appended; + + if (left > 0) { + s = Flush(); + if (!s.ok()) { + break; + } + } + } + } else { + // Writing directly to file bypassing the buffer + assert(buf_.CurrentSize() == 0); + s = WriteBuffered(src, left); + } + + TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds); + if (s.ok()) { + filesize_ += data.size(); + } + return s; +} + +Status WritableFileWriter::Pad(const size_t pad_bytes) { + assert(pad_bytes < kDefaultPageSize); + size_t left = pad_bytes; + size_t cap = buf_.Capacity() - buf_.CurrentSize(); + + // Assume pad_bytes is small compared to buf_ capacity. So we always + // use buf_ rather than write directly to file in certain cases like + // Append() does. + while (left) { + size_t append_bytes = std::min(cap, left); + buf_.PadWith(append_bytes, 0); + left -= append_bytes; + if (left > 0) { + Status s = Flush(); + if (!s.ok()) { + return s; + } + } + cap = buf_.Capacity() - buf_.CurrentSize(); + } + pending_sync_ = true; + filesize_ += pad_bytes; + return Status::OK(); +} + +Status WritableFileWriter::Close() { + // Do not quit immediately on failure the file MUST be closed + Status s; + + // Possible to close it twice now as we MUST close + // in __dtor, simply flushing is not enough + // Windows when pre-allocating does not fill with zeros + // also with unbuffered access we also set the end of data. + if (!writable_file_) { + return s; + } + + s = Flush(); // flush cache to OS + + Status interim; + // In direct I/O mode we write whole pages so + // we need to let the file know where data ends. + if (use_direct_io()) { + interim = writable_file_->Truncate(filesize_); + if (interim.ok()) { + interim = writable_file_->Fsync(); + } + if (!interim.ok() && s.ok()) { + s = interim; + } + } + + TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds); + interim = writable_file_->Close(); + if (!interim.ok() && s.ok()) { + s = interim; + } + + writable_file_.reset(); + TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds); + + return s; +} + +// write out the cached data to the OS cache or storage if direct I/O +// enabled +Status WritableFileWriter::Flush() { + Status s; + TEST_KILL_RANDOM("WritableFileWriter::Flush:0", + rocksdb_kill_odds * REDUCE_ODDS2); + + if (buf_.CurrentSize() > 0) { + if (use_direct_io()) { +#ifndef ROCKSDB_LITE + if (pending_sync_) { + s = WriteDirect(); + } +#endif // !ROCKSDB_LITE + } else { + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + } + if (!s.ok()) { + return s; + } + } + + s = writable_file_->Flush(); + + if (!s.ok()) { + return s; + } + + // sync OS cache to disk for every bytes_per_sync_ + // TODO: give log file and sst file different options (log + // files could be potentially cached in OS for their whole + // life time, thus we might not want to flush at all). + + // We try to avoid sync to the last 1MB of data. For two reasons: + // (1) avoid rewrite the same page that is modified later. + // (2) for older version of OS, write can block while writing out + // the page. + // Xfs does neighbor page flushing outside of the specified ranges. We + // need to make sure sync range is far from the write offset. + if (!use_direct_io() && bytes_per_sync_) { + const uint64_t kBytesNotSyncRange = + 1024 * 1024; // recent 1MB is not synced. + const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. + if (filesize_ > kBytesNotSyncRange) { + uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; + offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; + assert(offset_sync_to >= last_sync_size_); + if (offset_sync_to > 0 && + offset_sync_to - last_sync_size_ >= bytes_per_sync_) { + s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); + last_sync_size_ = offset_sync_to; + } + } + } + + return s; +} + +Status WritableFileWriter::Sync(bool use_fsync) { + Status s = Flush(); + if (!s.ok()) { + return s; + } + TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds); + if (!use_direct_io() && pending_sync_) { + s = SyncInternal(use_fsync); + if (!s.ok()) { + return s; + } + } + TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds); + pending_sync_ = false; + return Status::OK(); +} + +Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) { + if (!writable_file_->IsSyncThreadSafe()) { + return Status::NotSupported( + "Can't WritableFileWriter::SyncWithoutFlush() because " + "WritableFile::IsSyncThreadSafe() is false"); + } + TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); + Status s = SyncInternal(use_fsync); + TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); + return s; +} + +Status WritableFileWriter::SyncInternal(bool use_fsync) { + Status s; + IOSTATS_TIMER_GUARD(fsync_nanos); + TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); + auto prev_perf_level = GetPerfLevel(); + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); + if (use_fsync) { + s = writable_file_->Fsync(); + } else { + s = writable_file_->Sync(); + } + SetPerfLevel(prev_perf_level); + return s; +} + +Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { + IOSTATS_TIMER_GUARD(range_sync_nanos); + TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); + return writable_file_->RangeSync(offset, nbytes); +} + +// This method writes to disk the specified data and makes use of the rate +// limiter if available +Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { + Status s; + assert(!use_direct_io()); + const char* src = data; + size_t left = size; + + while (left > 0) { + size_t allowed; + if (rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, + RateLimiter::OpType::kWrite); + } else { + allowed = left; + } + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + +#ifndef ROCKSDB_LITE + FileOperationInfo::TimePoint start_ts; + uint64_t old_size = writable_file_->GetFileSize(); + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + old_size = next_write_offset_; + } +#endif + { + auto prev_perf_level = GetPerfLevel(); + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); + s = writable_file_->Append(Slice(src, allowed)); + SetPerfLevel(prev_perf_level); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s); + } +#endif + if (!s.ok()) { + return s; + } + } + + IOSTATS_ADD(bytes_written, allowed); + TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds); + + left -= allowed; + src += allowed; + } + buf_.Size(0); + return s; +} + +// This flushes the accumulated data in the buffer. We pad data with zeros if +// necessary to the whole page. +// However, during automatic flushes padding would not be necessary. +// We always use RateLimiter if available. We move (Refit) any buffer bytes +// that are left over the +// whole number of pages to be written again on the next flush because we can +// only write on aligned +// offsets. +#ifndef ROCKSDB_LITE +Status WritableFileWriter::WriteDirect() { + assert(use_direct_io()); + Status s; + const size_t alignment = buf_.Alignment(); + assert((next_write_offset_ % alignment) == 0); + + // Calculate whole page final file advance if all writes succeed + size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize()); + + // Calculate the leftover tail, we write it here padded with zeros BUT we + // will write + // it again in the future either on Close() OR when the current whole page + // fills out + size_t leftover_tail = buf_.CurrentSize() - file_advance; + + // Round up and pad + buf_.PadToAlignmentWith(0); + + const char* src = buf_.BufferStart(); + uint64_t write_offset = next_write_offset_; + size_t left = buf_.CurrentSize(); + + while (left > 0) { + // Check how much is allowed + size_t size; + if (rate_limiter_ != nullptr) { + size = rate_limiter_->RequestToken(left, buf_.Alignment(), + writable_file_->GetIOPriority(), + stats_, RateLimiter::OpType::kWrite); + } else { + size = left; + } + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + FileOperationInfo::TimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = std::chrono::system_clock::now(); + } + // direct writes must be positional + s = writable_file_->PositionedAppend(Slice(src, size), write_offset); + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::system_clock::now(); + NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s); + } + if (!s.ok()) { + buf_.Size(file_advance + leftover_tail); + return s; + } + } + + IOSTATS_ADD(bytes_written, size); + left -= size; + src += size; + write_offset += size; + assert((next_write_offset_ % alignment) == 0); + } + + if (s.ok()) { + // Move the tail to the beginning of the buffer + // This never happens during normal Append but rather during + // explicit call to Flush()/Sync() or Close() + buf_.RefitTail(file_advance, leftover_tail); + // This is where we start writing next time which may or not be + // the actual file size on disk. They match if the buffer size + // is a multiple of whole pages otherwise filesize_ is leftover_tail + // behind + next_write_offset_ += file_advance; + } + return s; +} +#endif // !ROCKSDB_LITE +} // namespace rocksdb diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h new file mode 100644 index 0000000000..09d612233a --- /dev/null +++ b/file/writable_file_writer.h @@ -0,0 +1,155 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/listener.h" +#include "rocksdb/rate_limiter.h" +#include "test_util/sync_point.h" +#include "util/aligned_buffer.h" + +namespace rocksdb { + +class Statistics; + +// WritableFileWriter is a wrapper on top of Env::WritableFile. It provides +// facilities to: +// - Handle Buffered and Direct writes. +// - Rate limit writes. +// - Flush and Sync the data to the underlying filesystem. +// - Notify any interested listeners on the completion of a write. +// - Update IO stats. +class WritableFileWriter { + private: +#ifndef ROCKSDB_LITE + void NotifyOnFileWriteFinish(uint64_t offset, size_t length, + const FileOperationInfo::TimePoint& start_ts, + const FileOperationInfo::TimePoint& finish_ts, + const Status& status) { + FileOperationInfo info(file_name_, start_ts, finish_ts); + info.offset = offset; + info.length = length; + info.status = status; + + for (auto& listener : listeners_) { + listener->OnFileWriteFinish(info); + } + } +#endif // ROCKSDB_LITE + + bool ShouldNotifyListeners() const { return !listeners_.empty(); } + + std::unique_ptr writable_file_; + std::string file_name_; + Env* env_; + AlignedBuffer buf_; + size_t max_buffer_size_; + // Actually written data size can be used for truncate + // not counting padding data + uint64_t filesize_; +#ifndef ROCKSDB_LITE + // This is necessary when we use unbuffered access + // and writes must happen on aligned offsets + // so we need to go back and write that page again + uint64_t next_write_offset_; +#endif // ROCKSDB_LITE + bool pending_sync_; + uint64_t last_sync_size_; + uint64_t bytes_per_sync_; + RateLimiter* rate_limiter_; + Statistics* stats_; + std::vector> listeners_; + + public: + WritableFileWriter( + std::unique_ptr&& file, const std::string& _file_name, + const EnvOptions& options, Env* env = nullptr, + Statistics* stats = nullptr, + const std::vector>& listeners = {}) + : writable_file_(std::move(file)), + file_name_(_file_name), + env_(env), + buf_(), + max_buffer_size_(options.writable_file_max_buffer_size), + filesize_(0), +#ifndef ROCKSDB_LITE + next_write_offset_(0), +#endif // ROCKSDB_LITE + pending_sync_(false), + last_sync_size_(0), + bytes_per_sync_(options.bytes_per_sync), + rate_limiter_(options.rate_limiter), + stats_(stats), + listeners_() { + TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", + reinterpret_cast(max_buffer_size_)); + buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); + buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); +#ifndef ROCKSDB_LITE + std::for_each(listeners.begin(), listeners.end(), + [this](const std::shared_ptr& e) { + if (e->ShouldBeNotifiedOnFileIO()) { + listeners_.emplace_back(e); + } + }); +#else // !ROCKSDB_LITE + (void)listeners; +#endif + } + + WritableFileWriter(const WritableFileWriter&) = delete; + + WritableFileWriter& operator=(const WritableFileWriter&) = delete; + + ~WritableFileWriter() { Close(); } + + std::string file_name() const { return file_name_; } + + Status Append(const Slice& data); + + Status Pad(const size_t pad_bytes); + + Status Flush(); + + Status Close(); + + Status Sync(bool use_fsync); + + // Sync only the data that was already Flush()ed. Safe to call concurrently + // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(), + // returns NotSupported status. + Status SyncWithoutFlush(bool use_fsync); + + uint64_t GetFileSize() const { return filesize_; } + + Status InvalidateCache(size_t offset, size_t length) { + return writable_file_->InvalidateCache(offset, length); + } + + WritableFile* writable_file() const { return writable_file_.get(); } + + bool use_direct_io() { return writable_file_->use_direct_io(); } + + bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } + + private: + // Used when os buffering is OFF and we are writing + // DMA such as in Direct I/O mode +#ifndef ROCKSDB_LITE + Status WriteDirect(); +#endif // !ROCKSDB_LITE + // Normal write + Status WriteBuffered(const char* data, size_t size); + Status RangeSync(uint64_t offset, uint64_t nbytes); + Status SyncInternal(bool use_fsync); +}; +} // namespace rocksdb diff --git a/logging/env_logger.h b/logging/env_logger.h index 94cf129228..5d7ff7afe1 100644 --- a/logging/env_logger.h +++ b/logging/env_logger.h @@ -16,11 +16,11 @@ #include "port/sys_time.h" #include +#include "file/writable_file_writer.h" #include "monitoring/iostats_context_imp.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" #include "test_util/sync_point.h" -#include "util/file_reader_writer.h" #include "util/mutexlock.h" namespace rocksdb { diff --git a/options/options_parser.cc b/options/options_parser.cc index d5b0c25a32..6d38f01926 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -13,12 +13,13 @@ #include #include +#include "file/read_write_util.h" +#include "file/writable_file_writer.h" #include "options/options_helper.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" #include "test_util/sync_point.h" #include "util/cast_util.h" -#include "util/file_reader_writer.h" #include "util/string_util.h" #include "port/port.h" diff --git a/src.mk b/src.mk index 7a555253da..42d6ae70b0 100644 --- a/src.mk +++ b/src.mk @@ -71,9 +71,15 @@ LIB_SOURCES = \ env/io_posix.cc \ env/mock_env.cc \ file/delete_scheduler.cc \ + file/file_prefetch_buffer.cc \ file/file_util.cc \ file/filename.cc \ + file/random_access_file_reader.cc \ + file/read_write_util.cc \ + file/readahead_raf.cc \ + file/sequence_file_reader.cc \ file/sst_file_manager_impl.cc \ + file/writable_file_writer.cc \ logging/auto_roll_logger.cc \ logging/event_logger.cc \ logging/log_buffer.cc \ @@ -159,7 +165,6 @@ LIB_SOURCES = \ util/concurrent_task_limiter_impl.cc \ util/crc32c.cc \ util/dynamic_bloom.cc \ - util/file_reader_writer.cc \ util/filter_policy.cc \ util/hash.cc \ util/murmurhash.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 0efd5e3c12..f6afab43fe 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -18,6 +18,9 @@ #include "db/dbformat.h" #include "db/pinned_iterators_manager.h" +#include "file/file_prefetch_buffer.h" +#include "file/random_access_file_reader.h" + #include "rocksdb/cache.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" @@ -49,7 +52,6 @@ #include "test_util/sync_point.h" #include "util/coding.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/xxhash.h" diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index c8e8ea006f..b18dccd11d 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -18,6 +18,7 @@ #include "db/range_tombstone_fragmenter.h" #include "file/filename.h" +#include "file/random_access_file_reader.h" #include "options/cf_options.h" #include "rocksdb/options.h" #include "rocksdb/persistent_cache.h" @@ -39,7 +40,6 @@ #include "table/two_level_iterator.h" #include "trace_replay/block_cache_tracer.h" #include "util/coding.h" -#include "util/file_reader_writer.h" #include "util/user_comparator_wrapper.h" namespace rocksdb { diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 81e1345d9c..3e9f6ff3f0 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -23,7 +23,6 @@ #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/xxhash.h" diff --git a/table/cuckoo/cuckoo_table_builder.cc b/table/cuckoo/cuckoo_table_builder.cc index f1a64cb6a6..8857cf7ea9 100644 --- a/table/cuckoo/cuckoo_table_builder.cc +++ b/table/cuckoo/cuckoo_table_builder.cc @@ -13,6 +13,7 @@ #include #include "db/dbformat.h" +#include "file/writable_file_writer.h" #include "rocksdb/env.h" #include "rocksdb/table.h" #include "table/block_based/block_builder.h" @@ -20,7 +21,6 @@ #include "table/format.h" #include "table/meta_blocks.h" #include "util/autovector.h" -#include "util/file_reader_writer.h" #include "util/random.h" #include "util/string_util.h" diff --git a/table/cuckoo/cuckoo_table_builder_test.cc b/table/cuckoo/cuckoo_table_builder_test.cc index f9d46c03bd..b84cc9f5bc 100644 --- a/table/cuckoo/cuckoo_table_builder_test.cc +++ b/table/cuckoo/cuckoo_table_builder_test.cc @@ -10,11 +10,12 @@ #include #include +#include "file/random_access_file_reader.h" +#include "file/writable_file_writer.h" #include "table/cuckoo/cuckoo_table_builder.h" #include "table/meta_blocks.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/file_reader_writer.h" namespace rocksdb { extern const uint64_t kCuckooTableMagicNumber; diff --git a/table/cuckoo/cuckoo_table_reader.h b/table/cuckoo/cuckoo_table_reader.h index ea33ffb2a8..d90a147573 100644 --- a/table/cuckoo/cuckoo_table_reader.h +++ b/table/cuckoo/cuckoo_table_reader.h @@ -15,11 +15,11 @@ #include #include "db/dbformat.h" +#include "file/random_access_file_reader.h" #include "options/cf_options.h" #include "rocksdb/env.h" #include "rocksdb/options.h" #include "table/table_reader.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/table/format.cc b/table/format.cc index b3eb281a2e..6b9125de76 100644 --- a/table/format.cc +++ b/table/format.cc @@ -13,6 +13,7 @@ #include #include "block_fetcher.h" +#include "file/random_access_file_reader.h" #include "logging/logging.h" #include "memory/memory_allocator.h" #include "monitoring/perf_context_imp.h" @@ -24,7 +25,6 @@ #include "util/coding.h" #include "util/compression.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/xxhash.h" diff --git a/table/format.h b/table/format.h index effc13adda..ef323a6471 100644 --- a/table/format.h +++ b/table/format.h @@ -17,6 +17,9 @@ #include #endif #endif +#include "file/file_prefetch_buffer.h" +#include "file/random_access_file_reader.h" + #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" @@ -27,7 +30,6 @@ #include "port/port.h" // noexcept #include "table/persistent_cache_options.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/xxhash.h" namespace rocksdb { diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index a5aa5c49ea..d6648bc381 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -12,6 +12,7 @@ #include #include "table/internal_iterator.h" +#include "test_util/sync_point.h" namespace rocksdb { diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 3bbc6d8708..1ba52d6e1c 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -9,6 +9,7 @@ #include "block_fetcher.h" #include "db/table_properties_collector.h" +#include "file/random_access_file_reader.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "table/block_based/block.h" @@ -18,7 +19,6 @@ #include "table/table_properties_internal.h" #include "test_util/sync_point.h" #include "util/coding.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/table/mock_table.cc b/table/mock_table.cc index 022f9a63f5..551c1ba5d1 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -6,11 +6,11 @@ #include "table/mock_table.h" #include "db/dbformat.h" +#include "file/random_access_file_reader.h" #include "port/port.h" #include "rocksdb/table_properties.h" #include "table/get_context.h" #include "util/coding.h" -#include "util/file_reader_writer.h" namespace rocksdb { namespace mock { diff --git a/table/plain/plain_table_builder.cc b/table/plain/plain_table_builder.cc index 8a51b64e60..696340525a 100644 --- a/table/plain/plain_table_builder.cc +++ b/table/plain/plain_table_builder.cc @@ -13,6 +13,7 @@ #include #include "db/dbformat.h" +#include "file/writable_file_writer.h" #include "rocksdb/comparator.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" @@ -26,7 +27,6 @@ #include "table/plain/plain_table_index.h" #include "util/coding.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" namespace rocksdb { diff --git a/table/plain/plain_table_key_coding.cc b/table/plain/plain_table_key_coding.cc index c84f337eb4..b70ce65e67 100644 --- a/table/plain/plain_table_key_coding.cc +++ b/table/plain/plain_table_key_coding.cc @@ -9,9 +9,9 @@ #include #include #include "db/dbformat.h" +#include "file/writable_file_writer.h" #include "table/plain/plain_table_factory.h" #include "table/plain/plain_table_reader.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/table/plain/plain_table_reader.h b/table/plain/plain_table_reader.h index f95616cc57..fa248d54ff 100644 --- a/table/plain/plain_table_reader.h +++ b/table/plain/plain_table_reader.h @@ -13,6 +13,7 @@ #include #include "db/dbformat.h" +#include "file/random_access_file_reader.h" #include "memory/arena.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" @@ -23,7 +24,6 @@ #include "table/plain/plain_table_factory.h" #include "table/plain/plain_table_index.h" #include "table/table_reader.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/table/sst_file_reader.cc b/table/sst_file_reader.cc index cc892c25b2..48db1d8b41 100644 --- a/table/sst_file_reader.cc +++ b/table/sst_file_reader.cc @@ -9,11 +9,11 @@ #include "db/db_iter.h" #include "db/dbformat.h" +#include "file/random_access_file_reader.h" #include "options/cf_options.h" #include "table/get_context.h" #include "table/table_builder.h" #include "table/table_reader.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index b53f3161e3..dc2c589f21 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -8,11 +8,11 @@ #include #include "db/dbformat.h" +#include "file/writable_file_writer.h" #include "rocksdb/table.h" #include "table/block_based/block_based_table_builder.h" #include "table/sst_file_writer_collectors.h" #include "test_util/sync_point.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/table/table_builder.h b/table/table_builder.h index 23189200c6..4a4b19b626 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -15,11 +15,11 @@ #include #include "db/dbformat.h" #include "db/table_properties_collector.h" +#include "file/writable_file_writer.h" #include "options/cf_options.h" #include "rocksdb/options.h" #include "rocksdb/table_properties.h" #include "trace_replay/block_cache_tracer.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index 45d760f0ef..05bb2ea25e 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -13,6 +13,7 @@ int main() { #include "db/db_impl/db_impl.h" #include "db/dbformat.h" +#include "file/random_access_file_reader.h" #include "monitoring/histogram.h" #include "rocksdb/db.h" #include "rocksdb/slice_transform.h" @@ -24,7 +25,6 @@ int main() { #include "table/table_builder.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/file_reader_writer.h" #include "util/gflags_compat.h" using GFLAGS_NAMESPACE::ParseCommandLineFlags; diff --git a/test_util/testutil.cc b/test_util/testutil.cc index f3e71bebce..3bf0e878c5 100644 --- a/test_util/testutil.cc +++ b/test_util/testutil.cc @@ -15,8 +15,10 @@ #include #include "db/memtable_list.h" +#include "file/random_access_file_reader.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" #include "port/port.h" -#include "util/file_reader_writer.h" namespace rocksdb { namespace test { diff --git a/tools/sst_dump_test.cc b/tools/sst_dump_test.cc index d3b1f0e581..d7391ebab9 100644 --- a/tools/sst_dump_test.cc +++ b/tools/sst_dump_test.cc @@ -12,12 +12,12 @@ #include #include "rocksdb/sst_dump_tool.h" +#include "file/random_access_file_reader.h" #include "rocksdb/filter_policy.h" #include "table/block_based/block_based_table_factory.h" #include "table/table_builder.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/tools/sst_dump_tool_imp.h b/tools/sst_dump_tool_imp.h index 51c15c8aa9..f8d5443528 100644 --- a/tools/sst_dump_tool_imp.h +++ b/tools/sst_dump_tool_imp.h @@ -10,8 +10,8 @@ #include #include #include "db/dbformat.h" +#include "file/writable_file_writer.h" #include "options/cf_options.h" -#include "util/file_reader_writer.h" namespace rocksdb { diff --git a/tools/trace_analyzer_test.cc b/tools/trace_analyzer_test.cc index dcc954384f..f277922411 100644 --- a/tools/trace_analyzer_test.cc +++ b/tools/trace_analyzer_test.cc @@ -23,6 +23,7 @@ int main() { #include #include "db/db_test_util.h" +#include "file/read_write_util.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/status.h" diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index 9ee746af4a..c5576873db 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -26,6 +26,8 @@ #include "db/db_impl/db_impl.h" #include "db/memtable.h" #include "db/write_batch_internal.h" +#include "file/read_write_util.h" +#include "file/writable_file_writer.h" #include "options/cf_options.h" #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -43,7 +45,6 @@ #include "trace_replay/trace_replay.h" #include "util/coding.h" #include "util/compression.h" -#include "util/file_reader_writer.h" #include "util/gflags_compat.h" #include "util/random.h" #include "util/string_util.h" diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc deleted file mode 100644 index b6a5eefcfd..0000000000 --- a/util/file_reader_writer.cc +++ /dev/null @@ -1,1085 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). -// -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -#include "util/file_reader_writer.h" - -#include -#include - -#include "monitoring/histogram.h" -#include "monitoring/iostats_context_imp.h" -#include "port/port.h" -#include "test_util/sync_point.h" -#include "util/random.h" -#include "util/rate_limiter.h" - -namespace rocksdb { - -#ifndef NDEBUG -namespace { -bool IsFileSectorAligned(const size_t off, size_t sector_size) { - return off % sector_size == 0; -} -} -#endif - -Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { - Status s; - if (use_direct_io()) { -#ifndef ROCKSDB_LITE - size_t offset = offset_.fetch_add(n); - size_t alignment = file_->GetRequiredBufferAlignment(); - size_t aligned_offset = TruncateToPageBoundary(alignment, offset); - size_t offset_advance = offset - aligned_offset; - size_t size = Roundup(offset + n, alignment) - aligned_offset; - size_t r = 0; - AlignedBuffer buf; - buf.Alignment(alignment); - buf.AllocateNewBuffer(size); - Slice tmp; - s = file_->PositionedRead(aligned_offset, size, &tmp, buf.BufferStart()); - if (s.ok() && offset_advance < tmp.size()) { - buf.Size(tmp.size()); - r = buf.Read(scratch, offset_advance, - std::min(tmp.size() - offset_advance, n)); - } - *result = Slice(scratch, r); -#endif // !ROCKSDB_LITE - } else { - s = file_->Read(n, result, scratch); - } - IOSTATS_ADD(bytes_read, result->size()); - return s; -} - - -Status SequentialFileReader::Skip(uint64_t n) { -#ifndef ROCKSDB_LITE - if (use_direct_io()) { - offset_ += static_cast(n); - return Status::OK(); - } -#endif // !ROCKSDB_LITE - return file_->Skip(n); -} - -Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, - char* scratch, bool for_compaction) const { - Status s; - uint64_t elapsed = 0; - { - StopWatch sw(env_, stats_, hist_type_, - (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, - true /*delay_enabled*/); - auto prev_perf_level = GetPerfLevel(); - IOSTATS_TIMER_GUARD(read_nanos); - if (use_direct_io()) { -#ifndef ROCKSDB_LITE - size_t alignment = file_->GetRequiredBufferAlignment(); - size_t aligned_offset = TruncateToPageBoundary(alignment, static_cast(offset)); - size_t offset_advance = static_cast(offset) - aligned_offset; - size_t read_size = Roundup(static_cast(offset + n), alignment) - aligned_offset; - AlignedBuffer buf; - buf.Alignment(alignment); - buf.AllocateNewBuffer(read_size); - while (buf.CurrentSize() < read_size) { - size_t allowed; - if (for_compaction && rate_limiter_ != nullptr) { - allowed = rate_limiter_->RequestToken( - buf.Capacity() - buf.CurrentSize(), buf.Alignment(), - Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); - } else { - assert(buf.CurrentSize() == 0); - allowed = read_size; - } - Slice tmp; - - FileOperationInfo::TimePoint start_ts; - uint64_t orig_offset = 0; - if (ShouldNotifyListeners()) { - start_ts = std::chrono::system_clock::now(); - orig_offset = aligned_offset + buf.CurrentSize(); - } - { - IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); - s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, &tmp, - buf.Destination()); - } - if (ShouldNotifyListeners()) { - auto finish_ts = std::chrono::system_clock::now(); - NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, - s); - } - - buf.Size(buf.CurrentSize() + tmp.size()); - if (!s.ok() || tmp.size() < allowed) { - break; - } - } - size_t res_len = 0; - if (s.ok() && offset_advance < buf.CurrentSize()) { - res_len = buf.Read(scratch, offset_advance, - std::min(buf.CurrentSize() - offset_advance, n)); - } - *result = Slice(scratch, res_len); -#endif // !ROCKSDB_LITE - } else { - size_t pos = 0; - const char* res_scratch = nullptr; - while (pos < n) { - size_t allowed; - if (for_compaction && rate_limiter_ != nullptr) { - if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { - sw.DelayStart(); - } - allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, - Env::IOPriority::IO_LOW, stats_, - RateLimiter::OpType::kRead); - if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { - sw.DelayStop(); - } - } else { - allowed = n; - } - Slice tmp_result; - -#ifndef ROCKSDB_LITE - FileOperationInfo::TimePoint start_ts; - if (ShouldNotifyListeners()) { - start_ts = std::chrono::system_clock::now(); - } -#endif - { - IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); - s = file_->Read(offset + pos, allowed, &tmp_result, scratch + pos); - } -#ifndef ROCKSDB_LITE - if (ShouldNotifyListeners()) { - auto finish_ts = std::chrono::system_clock::now(); - NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, - finish_ts, s); - } -#endif - - if (res_scratch == nullptr) { - // we can't simply use `scratch` because reads of mmap'd files return - // data in a different buffer. - res_scratch = tmp_result.data(); - } else { - // make sure chunks are inserted contiguously into `res_scratch`. - assert(tmp_result.data() == res_scratch + pos); - } - pos += tmp_result.size(); - if (!s.ok() || tmp_result.size() < allowed) { - break; - } - } - *result = Slice(res_scratch, s.ok() ? pos : 0); - } - IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size()); - SetPerfLevel(prev_perf_level); - } - if (stats_ != nullptr && file_read_hist_ != nullptr) { - file_read_hist_->Add(elapsed); - } - - return s; -} - -Status RandomAccessFileReader::MultiRead(ReadRequest* read_reqs, - size_t num_reqs) const { - Status s; - uint64_t elapsed = 0; - assert(!use_direct_io()); - { - StopWatch sw(env_, stats_, hist_type_, - (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, - true /*delay_enabled*/); - auto prev_perf_level = GetPerfLevel(); - IOSTATS_TIMER_GUARD(read_nanos); - -#ifndef ROCKSDB_LITE - FileOperationInfo::TimePoint start_ts; - if (ShouldNotifyListeners()) { - start_ts = std::chrono::system_clock::now(); - } -#endif // ROCKSDB_LITE - { - IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_); - s = file_->MultiRead(read_reqs, num_reqs); - } - for (size_t i = 0; i < num_reqs; ++i) { -#ifndef ROCKSDB_LITE - if (ShouldNotifyListeners()) { - auto finish_ts = std::chrono::system_clock::now(); - NotifyOnFileReadFinish(read_reqs[i].offset, - read_reqs[i].result.size(), start_ts, finish_ts, - read_reqs[i].status); - } -#endif // ROCKSDB_LITE - IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size()); - } - SetPerfLevel(prev_perf_level); - } - if (stats_ != nullptr && file_read_hist_ != nullptr) { - file_read_hist_->Add(elapsed); - } - - return s; -} - -Status WritableFileWriter::Append(const Slice& data) { - const char* src = data.data(); - size_t left = data.size(); - Status s; - pending_sync_ = true; - - TEST_KILL_RANDOM("WritableFileWriter::Append:0", - rocksdb_kill_odds * REDUCE_ODDS2); - - { - IOSTATS_TIMER_GUARD(prepare_write_nanos); - TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite"); - writable_file_->PrepareWrite(static_cast(GetFileSize()), left); - } - - // See whether we need to enlarge the buffer to avoid the flush - if (buf_.Capacity() - buf_.CurrentSize() < left) { - for (size_t cap = buf_.Capacity(); - cap < max_buffer_size_; // There is still room to increase - cap *= 2) { - // See whether the next available size is large enough. - // Buffer will never be increased to more than max_buffer_size_. - size_t desired_capacity = std::min(cap * 2, max_buffer_size_); - if (desired_capacity - buf_.CurrentSize() >= left || - (use_direct_io() && desired_capacity == max_buffer_size_)) { - buf_.AllocateNewBuffer(desired_capacity, true); - break; - } - } - } - - // Flush only when buffered I/O - if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) { - if (buf_.CurrentSize() > 0) { - s = Flush(); - if (!s.ok()) { - return s; - } - } - assert(buf_.CurrentSize() == 0); - } - - // We never write directly to disk with direct I/O on. - // or we simply use it for its original purpose to accumulate many small - // chunks - if (use_direct_io() || (buf_.Capacity() >= left)) { - while (left > 0) { - size_t appended = buf_.Append(src, left); - left -= appended; - src += appended; - - if (left > 0) { - s = Flush(); - if (!s.ok()) { - break; - } - } - } - } else { - // Writing directly to file bypassing the buffer - assert(buf_.CurrentSize() == 0); - s = WriteBuffered(src, left); - } - - TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds); - if (s.ok()) { - filesize_ += data.size(); - } - return s; -} - -Status WritableFileWriter::Pad(const size_t pad_bytes) { - assert(pad_bytes < kDefaultPageSize); - size_t left = pad_bytes; - size_t cap = buf_.Capacity() - buf_.CurrentSize(); - - // Assume pad_bytes is small compared to buf_ capacity. So we always - // use buf_ rather than write directly to file in certain cases like - // Append() does. - while (left) { - size_t append_bytes = std::min(cap, left); - buf_.PadWith(append_bytes, 0); - left -= append_bytes; - if (left > 0) { - Status s = Flush(); - if (!s.ok()) { - return s; - } - } - cap = buf_.Capacity() - buf_.CurrentSize(); - } - pending_sync_ = true; - filesize_ += pad_bytes; - return Status::OK(); -} - -Status WritableFileWriter::Close() { - - // Do not quit immediately on failure the file MUST be closed - Status s; - - // Possible to close it twice now as we MUST close - // in __dtor, simply flushing is not enough - // Windows when pre-allocating does not fill with zeros - // also with unbuffered access we also set the end of data. - if (!writable_file_) { - return s; - } - - s = Flush(); // flush cache to OS - - Status interim; - // In direct I/O mode we write whole pages so - // we need to let the file know where data ends. - if (use_direct_io()) { - interim = writable_file_->Truncate(filesize_); - if (interim.ok()) { - interim = writable_file_->Fsync(); - } - if (!interim.ok() && s.ok()) { - s = interim; - } - } - - TEST_KILL_RANDOM("WritableFileWriter::Close:0", rocksdb_kill_odds); - interim = writable_file_->Close(); - if (!interim.ok() && s.ok()) { - s = interim; - } - - writable_file_.reset(); - TEST_KILL_RANDOM("WritableFileWriter::Close:1", rocksdb_kill_odds); - - return s; -} - -// write out the cached data to the OS cache or storage if direct I/O -// enabled -Status WritableFileWriter::Flush() { - Status s; - TEST_KILL_RANDOM("WritableFileWriter::Flush:0", - rocksdb_kill_odds * REDUCE_ODDS2); - - if (buf_.CurrentSize() > 0) { - if (use_direct_io()) { -#ifndef ROCKSDB_LITE - if (pending_sync_) { - s = WriteDirect(); - } -#endif // !ROCKSDB_LITE - } else { - s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); - } - if (!s.ok()) { - return s; - } - } - - s = writable_file_->Flush(); - - if (!s.ok()) { - return s; - } - - // sync OS cache to disk for every bytes_per_sync_ - // TODO: give log file and sst file different options (log - // files could be potentially cached in OS for their whole - // life time, thus we might not want to flush at all). - - // We try to avoid sync to the last 1MB of data. For two reasons: - // (1) avoid rewrite the same page that is modified later. - // (2) for older version of OS, write can block while writing out - // the page. - // Xfs does neighbor page flushing outside of the specified ranges. We - // need to make sure sync range is far from the write offset. - if (!use_direct_io() && bytes_per_sync_) { - const uint64_t kBytesNotSyncRange = 1024 * 1024; // recent 1MB is not synced. - const uint64_t kBytesAlignWhenSync = 4 * 1024; // Align 4KB. - if (filesize_ > kBytesNotSyncRange) { - uint64_t offset_sync_to = filesize_ - kBytesNotSyncRange; - offset_sync_to -= offset_sync_to % kBytesAlignWhenSync; - assert(offset_sync_to >= last_sync_size_); - if (offset_sync_to > 0 && - offset_sync_to - last_sync_size_ >= bytes_per_sync_) { - s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); - last_sync_size_ = offset_sync_to; - } - } - } - - return s; -} - -Status WritableFileWriter::Sync(bool use_fsync) { - Status s = Flush(); - if (!s.ok()) { - return s; - } - TEST_KILL_RANDOM("WritableFileWriter::Sync:0", rocksdb_kill_odds); - if (!use_direct_io() && pending_sync_) { - s = SyncInternal(use_fsync); - if (!s.ok()) { - return s; - } - } - TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds); - pending_sync_ = false; - return Status::OK(); -} - -Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) { - if (!writable_file_->IsSyncThreadSafe()) { - return Status::NotSupported( - "Can't WritableFileWriter::SyncWithoutFlush() because " - "WritableFile::IsSyncThreadSafe() is false"); - } - TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); - Status s = SyncInternal(use_fsync); - TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); - return s; -} - -Status WritableFileWriter::SyncInternal(bool use_fsync) { - Status s; - IOSTATS_TIMER_GUARD(fsync_nanos); - TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); - auto prev_perf_level = GetPerfLevel(); - IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); - if (use_fsync) { - s = writable_file_->Fsync(); - } else { - s = writable_file_->Sync(); - } - SetPerfLevel(prev_perf_level); - return s; -} - -Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { - IOSTATS_TIMER_GUARD(range_sync_nanos); - TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); - return writable_file_->RangeSync(offset, nbytes); -} - -// This method writes to disk the specified data and makes use of the rate -// limiter if available -Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { - Status s; - assert(!use_direct_io()); - const char* src = data; - size_t left = size; - - while (left > 0) { - size_t allowed; - if (rate_limiter_ != nullptr) { - allowed = rate_limiter_->RequestToken( - left, 0 /* alignment */, writable_file_->GetIOPriority(), stats_, - RateLimiter::OpType::kWrite); - } else { - allowed = left; - } - - { - IOSTATS_TIMER_GUARD(write_nanos); - TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); - -#ifndef ROCKSDB_LITE - FileOperationInfo::TimePoint start_ts; - uint64_t old_size = writable_file_->GetFileSize(); - if (ShouldNotifyListeners()) { - start_ts = std::chrono::system_clock::now(); - old_size = next_write_offset_; - } -#endif - { - auto prev_perf_level = GetPerfLevel(); - IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, env_); - s = writable_file_->Append(Slice(src, allowed)); - SetPerfLevel(prev_perf_level); - } -#ifndef ROCKSDB_LITE - if (ShouldNotifyListeners()) { - auto finish_ts = std::chrono::system_clock::now(); - NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s); - } -#endif - if (!s.ok()) { - return s; - } - } - - IOSTATS_ADD(bytes_written, allowed); - TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0", rocksdb_kill_odds); - - left -= allowed; - src += allowed; - } - buf_.Size(0); - return s; -} - - -// This flushes the accumulated data in the buffer. We pad data with zeros if -// necessary to the whole page. -// However, during automatic flushes padding would not be necessary. -// We always use RateLimiter if available. We move (Refit) any buffer bytes -// that are left over the -// whole number of pages to be written again on the next flush because we can -// only write on aligned -// offsets. -#ifndef ROCKSDB_LITE -Status WritableFileWriter::WriteDirect() { - assert(use_direct_io()); - Status s; - const size_t alignment = buf_.Alignment(); - assert((next_write_offset_ % alignment) == 0); - - // Calculate whole page final file advance if all writes succeed - size_t file_advance = - TruncateToPageBoundary(alignment, buf_.CurrentSize()); - - // Calculate the leftover tail, we write it here padded with zeros BUT we - // will write - // it again in the future either on Close() OR when the current whole page - // fills out - size_t leftover_tail = buf_.CurrentSize() - file_advance; - - // Round up and pad - buf_.PadToAlignmentWith(0); - - const char* src = buf_.BufferStart(); - uint64_t write_offset = next_write_offset_; - size_t left = buf_.CurrentSize(); - - while (left > 0) { - // Check how much is allowed - size_t size; - if (rate_limiter_ != nullptr) { - size = rate_limiter_->RequestToken(left, buf_.Alignment(), - writable_file_->GetIOPriority(), - stats_, RateLimiter::OpType::kWrite); - } else { - size = left; - } - - { - IOSTATS_TIMER_GUARD(write_nanos); - TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); - FileOperationInfo::TimePoint start_ts; - if (ShouldNotifyListeners()) { - start_ts = std::chrono::system_clock::now(); - } - // direct writes must be positional - s = writable_file_->PositionedAppend(Slice(src, size), write_offset); - if (ShouldNotifyListeners()) { - auto finish_ts = std::chrono::system_clock::now(); - NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s); - } - if (!s.ok()) { - buf_.Size(file_advance + leftover_tail); - return s; - } - } - - IOSTATS_ADD(bytes_written, size); - left -= size; - src += size; - write_offset += size; - assert((next_write_offset_ % alignment) == 0); - } - - if (s.ok()) { - // Move the tail to the beginning of the buffer - // This never happens during normal Append but rather during - // explicit call to Flush()/Sync() or Close() - buf_.RefitTail(file_advance, leftover_tail); - // This is where we start writing next time which may or not be - // the actual file size on disk. They match if the buffer size - // is a multiple of whole pages otherwise filesize_ is leftover_tail - // behind - next_write_offset_ += file_advance; - } - return s; -} -#endif // !ROCKSDB_LITE - -namespace { -class ReadaheadRandomAccessFile : public RandomAccessFile { - public: - ReadaheadRandomAccessFile(std::unique_ptr&& file, - size_t readahead_size) - : file_(std::move(file)), - alignment_(file_->GetRequiredBufferAlignment()), - readahead_size_(Roundup(readahead_size, alignment_)), - buffer_(), - buffer_offset_(0) { - buffer_.Alignment(alignment_); - buffer_.AllocateNewBuffer(readahead_size_); - } - - ReadaheadRandomAccessFile(const ReadaheadRandomAccessFile&) = delete; - - ReadaheadRandomAccessFile& operator=(const ReadaheadRandomAccessFile&) = delete; - - Status Read(uint64_t offset, size_t n, Slice* result, - char* scratch) const override { - // Read-ahead only make sense if we have some slack left after reading - if (n + alignment_ >= readahead_size_) { - return file_->Read(offset, n, result, scratch); - } - - std::unique_lock lk(lock_); - - size_t cached_len = 0; - // Check if there is a cache hit, meaning that [offset, offset + n) is either - // completely or partially in the buffer. - // If it's completely cached, including end of file case when offset + n is - // greater than EOF, then return. - if (TryReadFromCache(offset, n, &cached_len, scratch) && - (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { - // We read exactly what we needed, or we hit end of file - return. - *result = Slice(scratch, cached_len); - return Status::OK(); - } - size_t advanced_offset = static_cast(offset + cached_len); - // In the case of cache hit advanced_offset is already aligned, means that - // chunk_offset equals to advanced_offset - size_t chunk_offset = TruncateToPageBoundary(alignment_, advanced_offset); - - Status s = ReadIntoBuffer(chunk_offset, readahead_size_); - if (s.ok()) { - // The data we need is now in cache, so we can safely read it - size_t remaining_len; - TryReadFromCache(advanced_offset, n - cached_len, &remaining_len, - scratch + cached_len); - *result = Slice(scratch, cached_len + remaining_len); - } - return s; - } - - Status Prefetch(uint64_t offset, size_t n) override { - if (n < readahead_size_) { - // Don't allow smaller prefetches than the configured `readahead_size_`. - // `Read()` assumes a smaller prefetch buffer indicates EOF was reached. - return Status::OK(); - } - - std::unique_lock lk(lock_); - - size_t offset_ = static_cast(offset); - size_t prefetch_offset = TruncateToPageBoundary(alignment_, offset_); - if (prefetch_offset == buffer_offset_) { - return Status::OK(); - } - return ReadIntoBuffer(prefetch_offset, - Roundup(offset_ + n, alignment_) - prefetch_offset); - } - - size_t GetUniqueId(char* id, size_t max_size) const override { - return file_->GetUniqueId(id, max_size); - } - - void Hint(AccessPattern pattern) override { file_->Hint(pattern); } - - Status InvalidateCache(size_t offset, size_t length) override { - std::unique_lock lk(lock_); - buffer_.Clear(); - return file_->InvalidateCache(offset, length); - } - - bool use_direct_io() const override { return file_->use_direct_io(); } - -private: - // Tries to read from buffer_ n bytes starting at offset. If anything was read - // from the cache, it sets cached_len to the number of bytes actually read, - // copies these number of bytes to scratch and returns true. - // If nothing was read sets cached_len to 0 and returns false. - bool TryReadFromCache(uint64_t offset, size_t n, size_t* cached_len, - char* scratch) const { - if (offset < buffer_offset_ || - offset >= buffer_offset_ + buffer_.CurrentSize()) { - *cached_len = 0; - return false; - } - uint64_t offset_in_buffer = offset - buffer_offset_; - *cached_len = std::min( - buffer_.CurrentSize() - static_cast(offset_in_buffer), n); - memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); - return true; - } - - // Reads into buffer_ the next n bytes from file_ starting at offset. - // Can actually read less if EOF was reached. - // Returns the status of the read operastion on the file. - Status ReadIntoBuffer(uint64_t offset, size_t n) const { - if (n > buffer_.Capacity()) { - n = buffer_.Capacity(); - } - assert(IsFileSectorAligned(offset, alignment_)); - assert(IsFileSectorAligned(n, alignment_)); - Slice result; - Status s = file_->Read(offset, n, &result, buffer_.BufferStart()); - if (s.ok()) { - buffer_offset_ = offset; - buffer_.Size(result.size()); - assert(result.size() == 0 || buffer_.BufferStart() == result.data()); - } - return s; - } - - const std::unique_ptr file_; - const size_t alignment_; - const size_t readahead_size_; - - mutable std::mutex lock_; - // The buffer storing the prefetched data - mutable AlignedBuffer buffer_; - // The offset in file_, corresponding to data stored in buffer_ - mutable uint64_t buffer_offset_; -}; - -// This class wraps a SequentialFile, exposing same API, with the differenece -// of being able to prefetch up to readahead_size bytes and then serve them -// from memory, avoiding the entire round-trip if, for example, the data for the -// file is actually remote. -class ReadaheadSequentialFile : public SequentialFile { - public: - ReadaheadSequentialFile(std::unique_ptr&& file, - size_t readahead_size) - : file_(std::move(file)), - alignment_(file_->GetRequiredBufferAlignment()), - readahead_size_(Roundup(readahead_size, alignment_)), - buffer_(), - buffer_offset_(0), - read_offset_(0) { - buffer_.Alignment(alignment_); - buffer_.AllocateNewBuffer(readahead_size_); - } - - ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete; - - ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete; - - Status Read(size_t n, Slice* result, char* scratch) override { - std::unique_lock lk(lock_); - - size_t cached_len = 0; - // Check if there is a cache hit, meaning that [offset, offset + n) is - // either completely or partially in the buffer. If it's completely cached, - // including end of file case when offset + n is greater than EOF, then - // return. - if (TryReadFromCache(n, &cached_len, scratch) && - (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { - // We read exactly what we needed, or we hit end of file - return. - *result = Slice(scratch, cached_len); - return Status::OK(); - } - n -= cached_len; - - Status s; - // Read-ahead only make sense if we have some slack left after reading - if (n + alignment_ >= readahead_size_) { - s = file_->Read(n, result, scratch + cached_len); - if (s.ok()) { - read_offset_ += result->size(); - *result = Slice(scratch, cached_len + result->size()); - } - buffer_.Clear(); - return s; - } - - s = ReadIntoBuffer(readahead_size_); - if (s.ok()) { - // The data we need is now in cache, so we can safely read it - size_t remaining_len; - TryReadFromCache(n, &remaining_len, scratch + cached_len); - *result = Slice(scratch, cached_len + remaining_len); - } - return s; - } - - Status Skip(uint64_t n) override { - std::unique_lock lk(lock_); - Status s = Status::OK(); - // First check if we need to skip already cached data - if (buffer_.CurrentSize() > 0) { - // Do we need to skip beyond cached data? - if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) { - // Yes. Skip whaterver is in memory and adjust offset accordingly - n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_; - read_offset_ = buffer_offset_ + buffer_.CurrentSize(); - } else { - // No. The entire section to be skipped is entirely i cache. - read_offset_ += n; - n = 0; - } - } - if (n > 0) { - // We still need to skip more, so call the file API for skipping - s = file_->Skip(n); - if (s.ok()) { - read_offset_ += n; - } - buffer_.Clear(); - } - return s; - } - - Status PositionedRead(uint64_t offset, size_t n, Slice* result, - char* scratch) override { - return file_->PositionedRead(offset, n, result, scratch); - } - - Status InvalidateCache(size_t offset, size_t length) override { - std::unique_lock lk(lock_); - buffer_.Clear(); - return file_->InvalidateCache(offset, length); - } - - bool use_direct_io() const override { return file_->use_direct_io(); } - - private: - // Tries to read from buffer_ n bytes. If anything was read from the cache, it - // sets cached_len to the number of bytes actually read, copies these number - // of bytes to scratch and returns true. - // If nothing was read sets cached_len to 0 and returns false. - bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) { - if (read_offset_ < buffer_offset_ || - read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) { - *cached_len = 0; - return false; - } - uint64_t offset_in_buffer = read_offset_ - buffer_offset_; - *cached_len = std::min( - buffer_.CurrentSize() - static_cast(offset_in_buffer), n); - memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); - read_offset_ += *cached_len; - return true; - } - - // Reads into buffer_ the next n bytes from file_. - // Can actually read less if EOF was reached. - // Returns the status of the read operastion on the file. - Status ReadIntoBuffer(size_t n) { - if (n > buffer_.Capacity()) { - n = buffer_.Capacity(); - } - assert(IsFileSectorAligned(n, alignment_)); - Slice result; - Status s = file_->Read(n, &result, buffer_.BufferStart()); - if (s.ok()) { - buffer_offset_ = read_offset_; - buffer_.Size(result.size()); - assert(result.size() == 0 || buffer_.BufferStart() == result.data()); - } - return s; - } - - const std::unique_ptr file_; - const size_t alignment_; - const size_t readahead_size_; - - std::mutex lock_; - // The buffer storing the prefetched data - AlignedBuffer buffer_; - // The offset in file_, corresponding to data stored in buffer_ - uint64_t buffer_offset_; - // The offset up to which data was read from file_. In fact, it can be larger - // than the actual file size, since the file_->Skip(n) call doesn't return the - // actual number of bytes that were skipped, which can be less than n. - // This is not a problemm since read_offset_ is monotonically increasing and - // its only use is to figure out if next piece of data should be read from - // buffer_ or file_ directly. - uint64_t read_offset_; -}; -} // namespace - -Status FilePrefetchBuffer::Prefetch(RandomAccessFileReader* reader, - uint64_t offset, size_t n, - bool for_compaction) { - size_t alignment = reader->file()->GetRequiredBufferAlignment(); - size_t offset_ = static_cast(offset); - uint64_t rounddown_offset = Rounddown(offset_, alignment); - uint64_t roundup_end = Roundup(offset_ + n, alignment); - uint64_t roundup_len = roundup_end - rounddown_offset; - assert(roundup_len >= alignment); - assert(roundup_len % alignment == 0); - - // Check if requested bytes are in the existing buffer_. - // If all bytes exist -- return. - // If only a few bytes exist -- reuse them & read only what is really needed. - // This is typically the case of incremental reading of data. - // If no bytes exist in buffer -- full pread. - - Status s; - uint64_t chunk_offset_in_buffer = 0; - uint64_t chunk_len = 0; - bool copy_data_to_new_buffer = false; - if (buffer_.CurrentSize() > 0 && offset >= buffer_offset_ && - offset <= buffer_offset_ + buffer_.CurrentSize()) { - if (offset + n <= buffer_offset_ + buffer_.CurrentSize()) { - // All requested bytes are already in the buffer. So no need to Read - // again. - return s; - } else { - // Only a few requested bytes are in the buffer. memmove those chunk of - // bytes to the beginning, and memcpy them back into the new buffer if a - // new buffer is created. - chunk_offset_in_buffer = Rounddown(static_cast(offset - buffer_offset_), alignment); - chunk_len = buffer_.CurrentSize() - chunk_offset_in_buffer; - assert(chunk_offset_in_buffer % alignment == 0); - assert(chunk_len % alignment == 0); - assert(chunk_offset_in_buffer + chunk_len <= - buffer_offset_ + buffer_.CurrentSize()); - if (chunk_len > 0) { - copy_data_to_new_buffer = true; - } else { - // this reset is not necessary, but just to be safe. - chunk_offset_in_buffer = 0; - } - } - } - - // Create a new buffer only if current capacity is not sufficient, and memcopy - // bytes from old buffer if needed (i.e., if chunk_len is greater than 0). - if (buffer_.Capacity() < roundup_len) { - buffer_.Alignment(alignment); - buffer_.AllocateNewBuffer(static_cast(roundup_len), - copy_data_to_new_buffer, chunk_offset_in_buffer, - static_cast(chunk_len)); - } else if (chunk_len > 0) { - // New buffer not needed. But memmove bytes from tail to the beginning since - // chunk_len is greater than 0. - buffer_.RefitTail(static_cast(chunk_offset_in_buffer), static_cast(chunk_len)); - } - - Slice result; - s = reader->Read(rounddown_offset + chunk_len, - static_cast(roundup_len - chunk_len), &result, - buffer_.BufferStart() + chunk_len, for_compaction); - if (s.ok()) { - buffer_offset_ = rounddown_offset; - buffer_.Size(static_cast(chunk_len) + result.size()); - } - return s; -} - -bool FilePrefetchBuffer::TryReadFromCache(uint64_t offset, size_t n, - Slice* result, bool for_compaction) { - if (track_min_offset_ && offset < min_offset_read_) { - min_offset_read_ = static_cast(offset); - } - if (!enable_ || offset < buffer_offset_) { - return false; - } - - // If the buffer contains only a few of the requested bytes: - // If readahead is enabled: prefetch the remaining bytes + readadhead bytes - // and satisfy the request. - // If readahead is not enabled: return false. - if (offset + n > buffer_offset_ + buffer_.CurrentSize()) { - if (readahead_size_ > 0) { - assert(file_reader_ != nullptr); - assert(max_readahead_size_ >= readahead_size_); - Status s; - if (for_compaction) { - s = Prefetch(file_reader_, offset, std::max(n, readahead_size_), for_compaction); - } else { - s = Prefetch(file_reader_, offset, n + readahead_size_, for_compaction); - } - if (!s.ok()) { - return false; - } - readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); - } else { - return false; - } - } - - uint64_t offset_in_buffer = offset - buffer_offset_; - *result = Slice(buffer_.BufferStart() + offset_in_buffer, n); - return true; -} - -std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr&& file, size_t readahead_size) { - std::unique_ptr result( - new ReadaheadRandomAccessFile(std::move(file), readahead_size)); - return result; -} - -std::unique_ptr -SequentialFileReader::NewReadaheadSequentialFile( - std::unique_ptr&& file, size_t readahead_size) { - if (file->GetRequiredBufferAlignment() >= readahead_size) { - // Short-circuit and return the original file if readahead_size is - // too small and hence doesn't make sense to be used for prefetching. - return std::move(file); - } - std::unique_ptr result( - new ReadaheadSequentialFile(std::move(file), readahead_size)); - return result; -} - -Status NewWritableFile(Env* env, const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options) { - Status s = env->NewWritableFile(fname, result, options); - TEST_KILL_RANDOM("NewWritableFile:0", rocksdb_kill_odds * REDUCE_ODDS2); - return s; -} - -bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file, - std::string* output, bool* has_data, Status* result) { - const int kBufferSize = 8192; - char buffer[kBufferSize + 1]; - Slice input_slice; - - std::string line; - bool has_complete_line = false; - while (!has_complete_line) { - if (std::getline(*iss, line)) { - has_complete_line = !iss->eof(); - } else { - has_complete_line = false; - } - if (!has_complete_line) { - // if we're not sure whether we have a complete line, - // further read from the file. - if (*has_data) { - *result = seq_file->Read(kBufferSize, &input_slice, buffer); - } - if (input_slice.size() == 0) { - // meaning we have read all the data - *has_data = false; - break; - } else { - iss->str(line + input_slice.ToString()); - // reset the internal state of iss so that we can keep reading it. - iss->clear(); - *has_data = (input_slice.size() == kBufferSize); - continue; - } - } - } - *output = line; - return *has_data || has_complete_line; -} - -} // namespace rocksdb diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h deleted file mode 100644 index a93274644c..0000000000 --- a/util/file_reader_writer.h +++ /dev/null @@ -1,407 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under both the GPLv2 (found in the -// COPYING file in the root directory) and Apache 2.0 License -// (found in the LICENSE.Apache file in the root directory). -// -// Copyright (c) 2011 The LevelDB Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. See the AUTHORS file for names of contributors. - -#pragma once -#include -#include -#include -#include "port/port.h" -#include "rocksdb/env.h" -#include "rocksdb/listener.h" -#include "rocksdb/rate_limiter.h" -#include "test_util/sync_point.h" -#include "util/aligned_buffer.h" - -namespace rocksdb { - -class Statistics; -class HistogramImpl; - -// This file provides the following main abstractions: -// SequentialFileReader : wrapper over Env::SequentialFile -// RandomAccessFileReader : wrapper over Env::RandomAccessFile -// WritableFileWriter : wrapper over Env::WritableFile -// In addition, it also exposed NewReadaheadRandomAccessFile, NewWritableFile, -// and ReadOneLine primitives. - -// NewReadaheadRandomAccessFile provides a wrapper over RandomAccessFile to -// always prefetch additional data with every read. This is mainly used in -// Compaction Table Readers. -std::unique_ptr NewReadaheadRandomAccessFile( - std::unique_ptr&& file, size_t readahead_size); - -// SequentialFileReader is a wrapper on top of Env::SequentialFile. It handles -// Buffered (i.e when page cache is enabled) and Direct (with O_DIRECT / page -// cache disabled) reads appropriately, and also updates the IO stats. -class SequentialFileReader { - private: - std::unique_ptr file_; - std::string file_name_; - std::atomic offset_{0}; // read offset - - public: - explicit SequentialFileReader(std::unique_ptr&& _file, - const std::string& _file_name) - : file_(std::move(_file)), file_name_(_file_name) {} - - explicit SequentialFileReader(std::unique_ptr&& _file, - const std::string& _file_name, - size_t _readahead_size) - : file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size)), - file_name_(_file_name) {} - - SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { - *this = std::move(o); - } - - SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { - file_ = std::move(o.file_); - return *this; - } - - SequentialFileReader(const SequentialFileReader&) = delete; - SequentialFileReader& operator=(const SequentialFileReader&) = delete; - - Status Read(size_t n, Slice* result, char* scratch); - - Status Skip(uint64_t n); - - SequentialFile* file() { return file_.get(); } - - std::string file_name() { return file_name_; } - - bool use_direct_io() const { return file_->use_direct_io(); } - - private: - // NewReadaheadSequentialFile provides a wrapper over SequentialFile to - // always prefetch additional data with every read. - static std::unique_ptr NewReadaheadSequentialFile( - std::unique_ptr&& file, size_t readahead_size); -}; - -// RandomAccessFileReader is a wrapper on top of Env::RnadomAccessFile. It is -// responsible for: -// - Handling Buffered and Direct reads appropriately. -// - Rate limiting compaction reads. -// - Notifying any interested listeners on the completion of a read. -// - Updating IO stats. -class RandomAccessFileReader { - private: -#ifndef ROCKSDB_LITE - void NotifyOnFileReadFinish(uint64_t offset, size_t length, - const FileOperationInfo::TimePoint& start_ts, - const FileOperationInfo::TimePoint& finish_ts, - const Status& status) const { - FileOperationInfo info(file_name_, start_ts, finish_ts); - info.offset = offset; - info.length = length; - info.status = status; - - for (auto& listener : listeners_) { - listener->OnFileReadFinish(info); - } - } -#endif // ROCKSDB_LITE - - bool ShouldNotifyListeners() const { return !listeners_.empty(); } - - std::unique_ptr file_; - std::string file_name_; - Env* env_; - Statistics* stats_; - uint32_t hist_type_; - HistogramImpl* file_read_hist_; - RateLimiter* rate_limiter_; - std::vector> listeners_; - - public: - explicit RandomAccessFileReader( - std::unique_ptr&& raf, std::string _file_name, - Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0, - HistogramImpl* file_read_hist = nullptr, - RateLimiter* rate_limiter = nullptr, - const std::vector>& listeners = {}) - : file_(std::move(raf)), - file_name_(std::move(_file_name)), - env_(env), - stats_(stats), - hist_type_(hist_type), - file_read_hist_(file_read_hist), - rate_limiter_(rate_limiter), - listeners_() { -#ifndef ROCKSDB_LITE - std::for_each(listeners.begin(), listeners.end(), - [this](const std::shared_ptr& e) { - if (e->ShouldBeNotifiedOnFileIO()) { - listeners_.emplace_back(e); - } - }); -#else // !ROCKSDB_LITE - (void)listeners; -#endif - } - - RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { - *this = std::move(o); - } - - RandomAccessFileReader& operator=(RandomAccessFileReader&& o) - ROCKSDB_NOEXCEPT { - file_ = std::move(o.file_); - env_ = std::move(o.env_); - stats_ = std::move(o.stats_); - hist_type_ = std::move(o.hist_type_); - file_read_hist_ = std::move(o.file_read_hist_); - rate_limiter_ = std::move(o.rate_limiter_); - return *this; - } - - RandomAccessFileReader(const RandomAccessFileReader&) = delete; - RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; - - Status Read(uint64_t offset, size_t n, Slice* result, char* scratch, - bool for_compaction = false) const; - - Status MultiRead(ReadRequest* reqs, size_t num_reqs) const; - - Status Prefetch(uint64_t offset, size_t n) const { - return file_->Prefetch(offset, n); - } - - RandomAccessFile* file() { return file_.get(); } - - std::string file_name() const { return file_name_; } - - bool use_direct_io() const { return file_->use_direct_io(); } -}; - -// WritableFileWriter is a wrapper on top of Env::WritableFile. It provides -// facilities to: -// - Handle Buffered and Direct writes. -// - Rate limit writes. -// - Flush and Sync the data to the underlying filesystem. -// - Notify any interested listeners on the completion of a write. -// - Update IO stats. -class WritableFileWriter { - private: -#ifndef ROCKSDB_LITE - void NotifyOnFileWriteFinish(uint64_t offset, size_t length, - const FileOperationInfo::TimePoint& start_ts, - const FileOperationInfo::TimePoint& finish_ts, - const Status& status) { - FileOperationInfo info(file_name_, start_ts, finish_ts); - info.offset = offset; - info.length = length; - info.status = status; - - for (auto& listener : listeners_) { - listener->OnFileWriteFinish(info); - } - } -#endif // ROCKSDB_LITE - - bool ShouldNotifyListeners() const { return !listeners_.empty(); } - - std::unique_ptr writable_file_; - std::string file_name_; - Env* env_; - AlignedBuffer buf_; - size_t max_buffer_size_; - // Actually written data size can be used for truncate - // not counting padding data - uint64_t filesize_; -#ifndef ROCKSDB_LITE - // This is necessary when we use unbuffered access - // and writes must happen on aligned offsets - // so we need to go back and write that page again - uint64_t next_write_offset_; -#endif // ROCKSDB_LITE - bool pending_sync_; - uint64_t last_sync_size_; - uint64_t bytes_per_sync_; - RateLimiter* rate_limiter_; - Statistics* stats_; - std::vector> listeners_; - - public: - WritableFileWriter( - std::unique_ptr&& file, const std::string& _file_name, - const EnvOptions& options, Env* env = nullptr, - Statistics* stats = nullptr, - const std::vector>& listeners = {}) - : writable_file_(std::move(file)), - file_name_(_file_name), - env_(env), - buf_(), - max_buffer_size_(options.writable_file_max_buffer_size), - filesize_(0), -#ifndef ROCKSDB_LITE - next_write_offset_(0), -#endif // ROCKSDB_LITE - pending_sync_(false), - last_sync_size_(0), - bytes_per_sync_(options.bytes_per_sync), - rate_limiter_(options.rate_limiter), - stats_(stats), - listeners_() { - TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", - reinterpret_cast(max_buffer_size_)); - buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); - buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); -#ifndef ROCKSDB_LITE - std::for_each(listeners.begin(), listeners.end(), - [this](const std::shared_ptr& e) { - if (e->ShouldBeNotifiedOnFileIO()) { - listeners_.emplace_back(e); - } - }); -#else // !ROCKSDB_LITE - (void)listeners; -#endif - } - - WritableFileWriter(const WritableFileWriter&) = delete; - - WritableFileWriter& operator=(const WritableFileWriter&) = delete; - - ~WritableFileWriter() { Close(); } - - std::string file_name() const { return file_name_; } - - Status Append(const Slice& data); - - Status Pad(const size_t pad_bytes); - - Status Flush(); - - Status Close(); - - Status Sync(bool use_fsync); - - // Sync only the data that was already Flush()ed. Safe to call concurrently - // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(), - // returns NotSupported status. - Status SyncWithoutFlush(bool use_fsync); - - uint64_t GetFileSize() const { return filesize_; } - - Status InvalidateCache(size_t offset, size_t length) { - return writable_file_->InvalidateCache(offset, length); - } - - WritableFile* writable_file() const { return writable_file_.get(); } - - bool use_direct_io() { return writable_file_->use_direct_io(); } - - bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } - - private: - // Used when os buffering is OFF and we are writing - // DMA such as in Direct I/O mode -#ifndef ROCKSDB_LITE - Status WriteDirect(); -#endif // !ROCKSDB_LITE - // Normal write - Status WriteBuffered(const char* data, size_t size); - Status RangeSync(uint64_t offset, uint64_t nbytes); - Status SyncInternal(bool use_fsync); -}; - -// FilePrefetchBuffer is a smart buffer to store and read data from a file. -class FilePrefetchBuffer { - public: - // Constructor. - // - // All arguments are optional. - // file_reader : the file reader to use. Can be a nullptr. - // readahead_size : the initial readahead size. - // max_readahead_size : the maximum readahead size. - // If max_readahead_size > readahead_size, the readahead size will be - // doubled on every IO until max_readahead_size is hit. - // Typically this is set as a multiple of readahead_size. - // max_readahead_size should be greater than equal to readahead_size. - // enable : controls whether reading from the buffer is enabled. - // If false, TryReadFromCache() always return false, and we only take stats - // for the minimum offset if track_min_offset = true. - // track_min_offset : Track the minimum offset ever read and collect stats on - // it. Used for adaptable readahead of the file footer/metadata. - // - // Automatic readhead is enabled for a file if file_reader, readahead_size, - // and max_readahead_size are passed in. - // If file_reader is a nullptr, setting readadhead_size and max_readahead_size - // does not make any sense. So it does nothing. - // A user can construct a FilePrefetchBuffer without any arguments, but use - // `Prefetch` to load data into the buffer. - FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr, - size_t readadhead_size = 0, size_t max_readahead_size = 0, - bool enable = true, bool track_min_offset = false) - : buffer_offset_(0), - file_reader_(file_reader), - readahead_size_(readadhead_size), - max_readahead_size_(max_readahead_size), - min_offset_read_(port::kMaxSizet), - enable_(enable), - track_min_offset_(track_min_offset) {} - - // Load data into the buffer from a file. - // reader : the file reader. - // offset : the file offset to start reading from. - // n : the number of bytes to read. - // for_compaction : if prefetch is done for compaction read. - Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n, - bool for_compaction = false); - - // Tries returning the data for a file raed from this buffer, if that data is - // in the buffer. - // It handles tracking the minimum read offset if track_min_offset = true. - // It also does the exponential readahead when readadhead_size is set as part - // of the constructor. - // - // offset : the file offset. - // n : the number of bytes. - // result : output buffer to put the data into. - // for_compaction : if cache read is done for compaction read. - bool TryReadFromCache(uint64_t offset, size_t n, Slice* result, - bool for_compaction = false); - - // The minimum `offset` ever passed to TryReadFromCache(). This will nly be - // tracked if track_min_offset = true. - size_t min_offset_read() const { return min_offset_read_; } - - private: - AlignedBuffer buffer_; - uint64_t buffer_offset_; - RandomAccessFileReader* file_reader_; - size_t readahead_size_; - size_t max_readahead_size_; - // The minimum `offset` ever passed to TryReadFromCache(). - size_t min_offset_read_; - // if false, TryReadFromCache() always return false, and we only take stats - // for track_min_offset_ if track_min_offset_ = true - bool enable_; - // If true, track minimum `offset` ever passed to TryReadFromCache(), which - // can be fetched from min_offset_read(). - bool track_min_offset_; -}; - -// Returns a WritableFile. -// -// env : the Env. -// fname : the file name. -// result : output arg. A WritableFile based on `fname` returned. -// options : the Env Options. -extern Status NewWritableFile(Env* env, const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options); - -// Read a single line from a file. -bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file, - std::string* output, bool* has_data, Status* result); - -} // namespace rocksdb diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 1b86f798f7..9a07bccccd 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -3,9 +3,12 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // -#include "util/file_reader_writer.h" #include #include +#include "file/random_access_file_reader.h" +#include "file/readahead_raf.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" #include "test_util/testharness.h" #include "test_util/testutil.h" #include "util/random.h" diff --git a/util/log_write_bench.cc b/util/log_write_bench.cc index ac4cb685b6..0b5eee00a2 100644 --- a/util/log_write_bench.cc +++ b/util/log_write_bench.cc @@ -11,11 +11,11 @@ int main() { } #else +#include "file/writable_file_writer.h" #include "monitoring/histogram.h" #include "rocksdb/env.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/file_reader_writer.h" #include "util/gflags_compat.h" using GFLAGS_NAMESPACE::ParseCommandLineFlags; diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index b7592a0ce2..9adeb721bc 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -11,6 +11,8 @@ #include "rocksdb/utilities/backupable_db.h" #include "file/filename.h" +#include "file/sequence_file_reader.h" +#include "file/writable_file_writer.h" #include "logging/logging.h" #include "port/port.h" #include "rocksdb/rate_limiter.h" @@ -19,7 +21,6 @@ #include "util/channel.h" #include "util/coding.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/string_util.h" #include "utilities/checkpoint/checkpoint_impl.h" diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 37d9e4cd18..725bc0740f 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -25,7 +25,6 @@ #include "test_util/sync_point.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/file_reader_writer.h" #include "util/mutexlock.h" #include "util/random.h" #include "util/stderr_logger.h" diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 574338e529..448d846cb1 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -15,7 +15,9 @@ #include "db/write_batch_internal.h" #include "file/file_util.h" #include "file/filename.h" +#include "file/random_access_file_reader.h" #include "file/sst_file_manager_impl.h" +#include "file/writable_file_writer.h" #include "logging/logging.h" #include "monitoring/instrumented_mutex.h" #include "monitoring/statistics.h" @@ -31,7 +33,6 @@ #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/mutexlock.h" #include "util/random.h" #include "util/stop_watch.h" diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index b74a211bc9..9905ce12e8 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -5,17 +5,18 @@ #ifndef ROCKSDB_LITE #include "utilities/blob_db/blob_dump_tool.h" -#include #include +#include #include #include #include +#include "file/random_access_file_reader.h" +#include "file/readahead_raf.h" #include "port/port.h" #include "rocksdb/convenience.h" #include "rocksdb/env.h" #include "table/format.h" #include "util/coding.h" -#include "util/file_reader_writer.h" #include "util/string_util.h" namespace rocksdb { diff --git a/utilities/blob_db/blob_dump_tool.h b/utilities/blob_db/blob_dump_tool.h index ff4672fd3f..39f83737d4 100644 --- a/utilities/blob_db/blob_dump_tool.h +++ b/utilities/blob_db/blob_dump_tool.h @@ -8,9 +8,9 @@ #include #include #include +#include "file/random_access_file_reader.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" -#include "util/file_reader_writer.h" #include "utilities/blob_db/blob_log_format.h" namespace rocksdb { diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index 3f128c7d55..de16b1522a 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -17,6 +17,7 @@ #include "db/db_impl/db_impl.h" #include "db/dbformat.h" #include "file/filename.h" +#include "file/readahead_raf.h" #include "logging/logging.h" #include "utilities/blob_db/blob_db_impl.h" diff --git a/utilities/blob_db/blob_file.h b/utilities/blob_db/blob_file.h index 668a037228..5442b727fe 100644 --- a/utilities/blob_db/blob_file.h +++ b/utilities/blob_db/blob_file.h @@ -8,10 +8,10 @@ #include #include +#include "file/random_access_file_reader.h" #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/options.h" -#include "util/file_reader_writer.h" #include "utilities/blob_db/blob_log_format.h" #include "utilities/blob_db/blob_log_reader.h" #include "utilities/blob_db/blob_log_writer.h" diff --git a/utilities/blob_db/blob_log_reader.cc b/utilities/blob_db/blob_log_reader.cc index 8ffcc2fa1e..2ede0a8f2f 100644 --- a/utilities/blob_db/blob_log_reader.cc +++ b/utilities/blob_db/blob_log_reader.cc @@ -9,8 +9,8 @@ #include +#include "file/random_access_file_reader.h" #include "monitoring/statistics.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" namespace rocksdb { diff --git a/utilities/blob_db/blob_log_reader.h b/utilities/blob_db/blob_log_reader.h index af7971554c..6c3990e304 100644 --- a/utilities/blob_db/blob_log_reader.h +++ b/utilities/blob_db/blob_log_reader.h @@ -10,11 +10,11 @@ #include #include +#include "file/random_access_file_reader.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" -#include "util/file_reader_writer.h" #include "utilities/blob_db/blob_log_format.h" namespace rocksdb { diff --git a/utilities/blob_db/blob_log_writer.cc b/utilities/blob_db/blob_log_writer.cc index 51578c5c32..5087f67e79 100644 --- a/utilities/blob_db/blob_log_writer.cc +++ b/utilities/blob_db/blob_log_writer.cc @@ -9,10 +9,10 @@ #include #include +#include "file/writable_file_writer.h" #include "monitoring/statistics.h" #include "rocksdb/env.h" #include "util/coding.h" -#include "util/file_reader_writer.h" #include "util/stop_watch.h" #include "utilities/blob_db/blob_log_format.h" diff --git a/utilities/persistent_cache/block_cache_tier_file.h b/utilities/persistent_cache/block_cache_tier_file.h index b7f820b068..ddea4032cb 100644 --- a/utilities/persistent_cache/block_cache_tier_file.h +++ b/utilities/persistent_cache/block_cache_tier_file.h @@ -11,6 +11,8 @@ #include #include +#include "file/random_access_file_reader.h" + #include "rocksdb/comparator.h" #include "rocksdb/env.h" @@ -21,7 +23,6 @@ #include "port/port.h" #include "util/crc32c.h" -#include "util/file_reader_writer.h" #include "util/mutexlock.h" // The io code path of persistent cache uses pipelined architecture diff --git a/utilities/simulator_cache/sim_cache.cc b/utilities/simulator_cache/sim_cache.cc index d84a593b9d..3e1f821f7a 100644 --- a/utilities/simulator_cache/sim_cache.cc +++ b/utilities/simulator_cache/sim_cache.cc @@ -5,10 +5,10 @@ #include "rocksdb/utilities/sim_cache.h" #include +#include "file/writable_file_writer.h" #include "monitoring/statistics.h" #include "port/port.h" #include "rocksdb/env.h" -#include "util/file_reader_writer.h" #include "util/mutexlock.h" #include "util/string_util.h" diff --git a/utilities/trace/file_trace_reader_writer.cc b/utilities/trace/file_trace_reader_writer.cc index d8e36c3127..2910745e20 100644 --- a/utilities/trace/file_trace_reader_writer.cc +++ b/utilities/trace/file_trace_reader_writer.cc @@ -5,9 +5,10 @@ #include "utilities/trace/file_trace_reader_writer.h" +#include "file/random_access_file_reader.h" +#include "file/writable_file_writer.h" #include "trace_replay/trace_replay.h" #include "util/coding.h" -#include "util/file_reader_writer.h" namespace rocksdb {