diff --git a/CMakeLists.txt b/CMakeLists.txt index 702e1ca6d2..847e3f4adb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -796,6 +796,7 @@ set(SOURCES trace_replay/trace_record_result.cc trace_replay/trace_record.cc trace_replay/trace_replay.cc + util/async_file_reader.cc util/cleanable.cc util/coding.cc util/compaction_job_stats_impl.cc diff --git a/HISTORY.md b/HISTORY.md index bb1a063bf0..fa0e9d016c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ * Add new stats PREFETCHED_BYTES_DISCARDED which records number of prefetched bytes discarded by RocksDB FilePrefetchBuffer on destruction and POLL_WAIT_MICROS records wait time for FS::Poll API completion. * RemoteCompaction supports table_properties_collector_factories override on compaction worker. * Start tracking SST unique id in MANIFEST, which will be used to verify with SST properties during DB open to make sure the SST file is not overwritten or misplaced. A db option `verify_sst_unique_id_in_manifest` is introduced to enable/disable the verification, if enabled all SST files will be opened during DB-open to verify the unique id (default is false), so it's recommended to use it with `max_open_files = -1` to pre-open the files. +* Added the ability to concurrently read data blocks from multiple files in a level in batched MultiGet. This can be enabled by setting the async_io option in ReadOptions. Using this feature requires a FileSystem that supports ReadAsync (PosixFileSystem is not supported yet for this), and for RocksDB to be compiled with folly and c++20. ### Public API changes * Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions. diff --git a/Makefile b/Makefile index 1d01f0dcff..8e8c7ed5b8 100644 --- a/Makefile +++ b/Makefile @@ -136,6 +136,13 @@ CXXFLAGS += $(PLATFORM_SHARED_CFLAGS) -DROCKSDB_DLL CFLAGS += $(PLATFORM_SHARED_CFLAGS) -DROCKSDB_DLL endif +ifeq ($(USE_COROUTINES), 1) + USE_FOLLY = 1 + OPT += -DUSE_COROUTINES + ROCKSDB_CXX_STANDARD = c++2a + USE_RTTI = 1 +endif + # if we're compiling for release, compile without debug code (-DNDEBUG) ifeq ($(DEBUG_LEVEL),0) OPT += -DNDEBUG @@ -226,6 +233,7 @@ dummy := $(shell (export ROCKSDB_ROOT="$(CURDIR)"; \ export ROCKSDB_NO_FBCODE="$(ROCKSDB_NO_FBCODE)"; \ export USE_CLANG="$(USE_CLANG)"; \ export LIB_MODE="$(LIB_MODE)"; \ + export ROCKSDB_CXX_STANDARD="$(ROCKSDB_CXX_STANDARD)"; \ "$(CURDIR)/build_tools/build_detect_platform" "$(CURDIR)/make_config.mk")) # this file is generated by the previous line to set build flags and sources include make_config.mk diff --git a/TARGETS b/TARGETS index 0bc5ad0f53..5d0e2836ed 100644 --- a/TARGETS +++ b/TARGETS @@ -224,6 +224,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "trace_replay/trace_record_handler.cc", "trace_replay/trace_record_result.cc", "trace_replay/trace_replay.cc", + "util/async_file_reader.cc", "util/build_version.cc", "util/cleanable.cc", "util/coding.cc", @@ -327,7 +328,13 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "utilities/wal_filter.cc", "utilities/write_batch_with_index/write_batch_with_index.cc", "utilities/write_batch_with_index/write_batch_with_index_internal.cc", - ], deps=["//folly/container:f14_hash"], headers=None, link_whole=False, extra_test_libs=False) + ], deps=[ + "//folly/container:f14_hash", + "//folly/experimental/coro:blocking_wait", + "//folly/experimental/coro:collect", + "//folly/experimental/coro:coroutine", + "//folly/experimental/coro:task", + ], headers=None, link_whole=False, extra_test_libs=False) cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "cache/cache.cc", @@ -545,6 +552,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "trace_replay/trace_record_handler.cc", "trace_replay/trace_record_result.cc", "trace_replay/trace_replay.cc", + "util/async_file_reader.cc", "util/build_version.cc", "util/cleanable.cc", "util/coding.cc", @@ -648,7 +656,13 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "utilities/wal_filter.cc", "utilities/write_batch_with_index/write_batch_with_index.cc", "utilities/write_batch_with_index/write_batch_with_index_internal.cc", - ], deps=["//folly/container:f14_hash"], headers=None, link_whole=True, extra_test_libs=False) + ], deps=[ + "//folly/container:f14_hash", + "//folly/experimental/coro:blocking_wait", + "//folly/experimental/coro:collect", + "//folly/experimental/coro:coroutine", + "//folly/experimental/coro:task", + ], headers=None, link_whole=True, extra_test_libs=False) cpp_library_wrapper(name="rocksdb_test_lib", srcs=[ "db/db_test_util.cc", diff --git a/buckifier/buckify_rocksdb.py b/buckifier/buckify_rocksdb.py index e675c0df71..f285d49d17 100755 --- a/buckifier/buckify_rocksdb.py +++ b/buckifier/buckify_rocksdb.py @@ -145,7 +145,12 @@ def generate_targets(repo_path, deps_map): # always add range_tree, it's only excluded on ppc64, which we don't use internally src_mk["RANGE_TREE_SOURCES"] + src_mk["TOOL_LIB_SOURCES"], - deps=["//folly/container:f14_hash"]) + deps=[ + "//folly/container:f14_hash", + "//folly/experimental/coro:blocking_wait", + "//folly/experimental/coro:collect", + "//folly/experimental/coro:coroutine", + "//folly/experimental/coro:task"]) # rocksdb_whole_archive_lib TARGETS.add_library( "rocksdb_whole_archive_lib", @@ -153,7 +158,12 @@ def generate_targets(repo_path, deps_map): # always add range_tree, it's only excluded on ppc64, which we don't use internally src_mk["RANGE_TREE_SOURCES"] + src_mk["TOOL_LIB_SOURCES"], - deps=["//folly/container:f14_hash"], + deps=[ + "//folly/container:f14_hash", + "//folly/experimental/coro:blocking_wait", + "//folly/experimental/coro:collect", + "//folly/experimental/coro:coroutine", + "//folly/experimental/coro:task"], headers=None, extra_external_deps="", link_whole=True) diff --git a/build_tools/dependencies_platform009.sh b/build_tools/dependencies_platform009.sh index 427b62da48..ff0336e2c5 100644 --- a/build_tools/dependencies_platform009.sh +++ b/build_tools/dependencies_platform009.sh @@ -20,3 +20,7 @@ VALGRIND_BASE=/mnt/gvfs/third-party2/valgrind/6ae525939ad02e5e676855082fbbc7828d LUA_BASE=/mnt/gvfs/third-party2/lua/162efd9561a3d21f6869f4814011e9cf1b3ff4dc/5.3.4/platform009/a6271c4 BENCHMARK_BASE=/mnt/gvfs/third-party2/benchmark/30bf49ad6414325e17f3425b0edcb64239427ae3/1.6.1/platform009/7f3b187 BOOST_BASE=/mnt/gvfs/third-party2/boost/201b7d74941e54b436dfa364a063aa6d2cd7de4c/1.69.0/platform009/8a7ffdf +GLOG_BASE=/mnt/gvfs/third-party2/glog/32d751bd5673375b438158717ab6a57c1cc57e3d/0.3.2_fb/platform009/10a364d/ +FMT_BASE=/mnt/gvfs/third-party2/fmt/ce0c25f67165f4d2c22a29b8ef50f5600d7873ca/6.1.1/platform009/7f3b187/ +DBL_CONV_BASE=/mnt/gvfs/third-party2/double_conversion/109b3d9696d71f1048678cd7da1e22505470543d/20141126/platform009/7f3b187/ +LIBEVENT_BASE=/mnt/gvfs/third-party2/libevent/4a4d3a79a76c2439b6bd471bf3586b3481dde75e/1.4.14b_hphp/platform009/7f3b187/ diff --git a/build_tools/fbcode_config_platform009.sh b/build_tools/fbcode_config_platform009.sh index a22aaa90d5..b4e64b6692 100644 --- a/build_tools/fbcode_config_platform009.sh +++ b/build_tools/fbcode_config_platform009.sh @@ -14,7 +14,7 @@ source "$BASEDIR/dependencies_platform009.sh" CFLAGS="" # libgcc -LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/9.3.0" +LIBGCC_INCLUDE="$LIBGCC_BASE/include/c++/9.3.0 -I $LIBGCC_BASE/include/c++/9.3.0/backward" LIBGCC_LIBS=" -L $LIBGCC_BASE/lib" # glibc @@ -70,6 +70,18 @@ BENCHMARK_LIBS=" $BENCHMARK_BASE/lib/libbenchmark${MAYBE_PIC}.a" BOOST_INCLUDE=" -I $BOOST_BASE/include/" +GLOG_INCLUDE=" -I $GLOG_BASE/include/" +GLOG_LIBS=" $GLOG_BASE/lib/libglog${MAYBE_PIC}.a" + +FMT_INCLUDE=" -I $FMT_BASE/include/" +FMT_LIBS=" $FMT_BASE/lib/libfmt${MAYBE_PIC}.a" + +DBL_CONV_INCLUDE=" -I $DBL_CONV_BASE/include/" +DBL_CONV_LIBS=" $DBL_CONV_BASE/lib/libdouble-conversion${MAYBE_PIC}.a" + +LIBEVENT_INCLUDE=" -I $LIBEVENT_BASE/include/" +LIBEVENT_LIBS=" $LIBEVENT_BASE/lib/libevent${MAYBE_PIC}.a" + # location of jemalloc JEMALLOC_INCLUDE=" -I $JEMALLOC_BASE/include/" JEMALLOC_LIB=" $JEMALLOC_BASE/lib/libjemalloc${MAYBE_PIC}.a" @@ -101,7 +113,7 @@ BINUTILS="$BINUTILS_BASE/bin" AR="$BINUTILS/ar" AS="$BINUTILS/as" -DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE $LIBURING_INCLUDE $BENCHMARK_INCLUDE $BOOST_INCLUDE" +DEPS_INCLUDE="$SNAPPY_INCLUDE $ZLIB_INCLUDE $BZIP_INCLUDE $LZ4_INCLUDE $ZSTD_INCLUDE $GFLAGS_INCLUDE $NUMA_INCLUDE $TBB_INCLUDE $LIBURING_INCLUDE $BENCHMARK_INCLUDE $BOOST_INCLUDE $GLOG_INCLUDE $FMT_INCLUDE $DBL_CONV_INCLUDE $LIBEVENT_INCLUDE" STDLIBS="-L $GCC_BASE/lib64" diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 8fe39baf6c..a6bcea43b9 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -12,6 +12,7 @@ #include "db/db_test_util.h" #include "options/options_helper.h" #include "port/stack_trace.h" +#include "rocksdb/filter_policy.h" #include "rocksdb/flush_block_policy.h" #include "rocksdb/merge_operator.h" #include "rocksdb/perf_context.h" @@ -1180,10 +1181,17 @@ TEST_F(DBBasicTest, DBCloseFlushError) { Destroy(options); } -class DBMultiGetTestWithParam : public DBBasicTest, - public testing::WithParamInterface {}; +class DBMultiGetTestWithParam + : public DBBasicTest, + public testing::WithParamInterface> {}; TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, @@ -1240,7 +1248,8 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) { keys.push_back(std::get<1>(cf_kv_vec[i])); } - values = MultiGet(cfs, keys, nullptr, GetParam()); + values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()), + std::get<1>(GetParam())); ASSERT_EQ(values.size(), num_keys); for (unsigned int j = 0; j < values.size(); ++j) { ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2"); @@ -1254,7 +1263,8 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) { keys.push_back(std::get<1>(cf_kv_vec[3])); cfs.push_back(std::get<0>(cf_kv_vec[4])); keys.push_back(std::get<1>(cf_kv_vec[4])); - values = MultiGet(cfs, keys, nullptr, GetParam()); + values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()), + std::get<1>(GetParam())); ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2"); ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2"); ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2"); @@ -1267,7 +1277,8 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) { keys.push_back(std::get<1>(cf_kv_vec[6])); cfs.push_back(std::get<0>(cf_kv_vec[1])); keys.push_back(std::get<1>(cf_kv_vec[1])); - values = MultiGet(cfs, keys, nullptr, GetParam()); + values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()), + std::get<1>(GetParam())); ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2"); ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2"); ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2"); @@ -1283,6 +1294,12 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) { } TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, @@ -1328,7 +1345,8 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) { keys.push_back("cf" + std::to_string(i) + "_key"); } - values = MultiGet(cfs, keys, nullptr, GetParam()); + values = MultiGet(cfs, keys, nullptr, std::get<0>(GetParam()), + std::get<1>(GetParam())); ASSERT_TRUE(last_try); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { @@ -1345,6 +1363,12 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) { } TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich", "alyosha", "popovich"}, @@ -1389,7 +1413,8 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) { } const Snapshot* snapshot = db_->GetSnapshot(); - values = MultiGet(cfs, keys, snapshot, GetParam()); + values = MultiGet(cfs, keys, snapshot, std::get<0>(GetParam()), + std::get<1>(GetParam())); db_->ReleaseSnapshot(snapshot); ASSERT_EQ(values.size(), 8); for (unsigned int j = 0; j < values.size(); ++j) { @@ -1405,6 +1430,12 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) { } TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFUnsorted) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES Options options = CurrentOptions(); CreateAndReopenWithCF({"one", "two"}, options); @@ -1417,8 +1448,9 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFUnsorted) { std::vector keys{"foo", "baz", "abc"}; std::vector values; - values = - MultiGet(cfs, keys, /* snapshot */ nullptr, /* batched */ GetParam()); + values = MultiGet(cfs, keys, /* snapshot */ nullptr, + /* batched */ std::get<0>(GetParam()), + /* async */ std::get<1>(GetParam())); ASSERT_EQ(values.size(), 3); ASSERT_EQ(values[0], "bar"); @@ -1426,10 +1458,18 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFUnsorted) { ASSERT_EQ(values[2], "def"); } -INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam, - testing::Bool()); - -TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) { +TEST_P(DBMultiGetTestWithParam, MultiGetBatchedSimpleUnsorted) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES + // Skip for unbatched MultiGet + if (!std::get<0>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test is only for batched MultiGet"); + return; + } do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); SetPerfLevel(kEnableCount); @@ -1448,8 +1488,10 @@ TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) { std::vector cfs(keys.size(), handles_[1]); std::vector s(keys.size()); - db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(), - values.data(), s.data(), false); + ReadOptions ro; + ro.async_io = std::get<1>(GetParam()); + db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(), + s.data(), false); ASSERT_EQ(values.size(), keys.size()); ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1"); @@ -1470,7 +1512,18 @@ TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) { } while (ChangeCompactOptions()); } -TEST_F(DBBasicTest, MultiGetBatchedSortedMultiFile) { +TEST_P(DBMultiGetTestWithParam, MultiGetBatchedSortedMultiFile) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES + // Skip for unbatched MultiGet + if (!std::get<0>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test is only for batched MultiGet"); + return; + } do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); SetPerfLevel(kEnableCount); @@ -1493,8 +1546,10 @@ TEST_F(DBBasicTest, MultiGetBatchedSortedMultiFile) { std::vector cfs(keys.size(), handles_[1]); std::vector s(keys.size()); - db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(), - values.data(), s.data(), true); + ReadOptions ro; + ro.async_io = std::get<1>(GetParam()); + db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(), + s.data(), true); ASSERT_EQ(values.size(), keys.size()); ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1"); @@ -1515,7 +1570,18 @@ TEST_F(DBBasicTest, MultiGetBatchedSortedMultiFile) { } while (ChangeOptions()); } -TEST_F(DBBasicTest, MultiGetBatchedDuplicateKeys) { +TEST_P(DBMultiGetTestWithParam, MultiGetBatchedDuplicateKeys) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES + // Skip for unbatched MultiGet + if (!std::get<0>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test is only for batched MultiGet"); + return; + } Options opts = CurrentOptions(); opts.merge_operator = MergeOperators::CreateStringAppendOperator(); CreateAndReopenWithCF({"pikachu"}, opts); @@ -1546,8 +1612,10 @@ TEST_F(DBBasicTest, MultiGetBatchedDuplicateKeys) { std::vector cfs(keys.size(), handles_[1]); std::vector s(keys.size()); - db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(), - values.data(), s.data(), false); + ReadOptions ro; + ro.async_io = std::get<1>(GetParam()); + db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(), + s.data(), false); ASSERT_EQ(values.size(), keys.size()); ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v8"); @@ -1566,7 +1634,18 @@ TEST_F(DBBasicTest, MultiGetBatchedDuplicateKeys) { SetPerfLevel(kDisable); } -TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) { +TEST_P(DBMultiGetTestWithParam, MultiGetBatchedMultiLevel) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES + // Skip for unbatched MultiGet + if (!std::get<0>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test is only for batched MultiGet"); + return; + } Options options = CurrentOptions(); options.disable_auto_compactions = true; Reopen(options); @@ -1625,7 +1704,7 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) { keys.push_back("key_" + std::to_string(i)); } - values = MultiGet(keys, nullptr); + values = MultiGet(keys, nullptr, std::get<1>(GetParam())); ASSERT_EQ(values.size(), 16); for (unsigned int j = 0; j < values.size(); ++j) { int key = j + 64; @@ -1641,7 +1720,18 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) { } } -TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) { +TEST_P(DBMultiGetTestWithParam, MultiGetBatchedMultiLevelMerge) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES + // Skip for unbatched MultiGet + if (!std::get<0>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test is only for batched MultiGet"); + return; + } Options options = CurrentOptions(); options.disable_auto_compactions = true; options.merge_operator = MergeOperators::CreateStringAppendOperator(); @@ -1705,7 +1795,7 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) { keys.push_back("key_" + std::to_string(i)); } - values = MultiGet(keys, nullptr); + values = MultiGet(keys, nullptr, std::get<1>(GetParam())); ASSERT_EQ(values.size(), keys.size()); for (unsigned int j = 0; j < 48; ++j) { int key = j + 32; @@ -1727,7 +1817,18 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) { } } -TEST_F(DBBasicTest, MultiGetBatchedValueSizeInMemory) { +TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeInMemory) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES + // Skip for unbatched MultiGet + if (!std::get<0>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test is only for batched MultiGet"); + return; + } CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); SetPerfLevel(kEnableCount); ASSERT_OK(Put(1, "k1", "v_1")); @@ -1744,6 +1845,7 @@ TEST_F(DBBasicTest, MultiGetBatchedValueSizeInMemory) { get_perf_context()->Reset(); ReadOptions ro; ro.value_size_soft_limit = 11; + ro.async_io = std::get<1>(GetParam()); db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(), s.data(), false); @@ -1761,7 +1863,17 @@ TEST_F(DBBasicTest, MultiGetBatchedValueSizeInMemory) { SetPerfLevel(kDisable); } -TEST_F(DBBasicTest, MultiGetBatchedValueSize) { +TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSize) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES + // Skip for unbatched MultiGet + if (!std::get<0>(GetParam())) { + return; + } do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); SetPerfLevel(kEnableCount); @@ -1801,6 +1913,7 @@ TEST_F(DBBasicTest, MultiGetBatchedValueSize) { ReadOptions ro; ro.value_size_soft_limit = 20; + ro.async_io = std::get<1>(GetParam()); db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(), s.data(), false); @@ -1836,7 +1949,18 @@ TEST_F(DBBasicTest, MultiGetBatchedValueSize) { } while (ChangeCompactOptions()); } -TEST_F(DBBasicTest, MultiGetBatchedValueSizeMultiLevelMerge) { +TEST_P(DBMultiGetTestWithParam, MultiGetBatchedValueSizeMultiLevelMerge) { +#ifndef USE_COROUTINES + if (std::get<1>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES + // Skip for unbatched MultiGet + if (!std::get<0>(GetParam())) { + ROCKSDB_GTEST_SKIP("This test is only for batched MultiGet"); + return; + } Options options = CurrentOptions(); options.disable_auto_compactions = true; options.merge_operator = MergeOperators::CreateStringAppendOperator(); @@ -1908,6 +2032,7 @@ TEST_F(DBBasicTest, MultiGetBatchedValueSizeMultiLevelMerge) { ReadOptions read_options; read_options.verify_checksums = true; read_options.value_size_soft_limit = 380; + read_options.async_io = std::get<1>(GetParam()); db_->MultiGet(read_options, dbfull()->DefaultColumnFamily(), keys.size(), keys.data(), values.data(), statuses.data()); @@ -1939,6 +2064,217 @@ TEST_F(DBBasicTest, MultiGetBatchedValueSizeMultiLevelMerge) { } } +INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam, + testing::Combine(testing::Bool(), testing::Bool())); + +#if USE_COROUTINES +class DBMultiGetAsyncIOTest : public DBBasicTest { + public: + DBMultiGetAsyncIOTest() + : DBBasicTest(), statistics_(ROCKSDB_NAMESPACE::CreateDBStatistics()) { + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10)); + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.statistics = statistics_; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Reopen(options); + int num_keys = 0; + + // Put all keys in the bottommost level, and overwrite some keys + // in L0 and L1 + for (int i = 0; i < 128; ++i) { + EXPECT_OK(Put(Key(i), "val_l2_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + EXPECT_OK(Flush()); + num_keys = 0; + } + } + if (num_keys > 0) { + EXPECT_OK(Flush()); + num_keys = 0; + } + MoveFilesToLevel(2); + + for (int i = 0; i < 128; i += 3) { + EXPECT_OK(Put(Key(i), "val_l1_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + EXPECT_OK(Flush()); + num_keys = 0; + } + } + if (num_keys > 0) { + EXPECT_OK(Flush()); + num_keys = 0; + } + MoveFilesToLevel(1); + + for (int i = 0; i < 128; i += 5) { + EXPECT_OK(Put(Key(i), "val_l0_" + std::to_string(i))); + num_keys++; + if (num_keys == 8) { + EXPECT_OK(Flush()); + num_keys = 0; + } + } + if (num_keys > 0) { + EXPECT_OK(Flush()); + num_keys = 0; + } + EXPECT_EQ(0, num_keys); + } + + const std::shared_ptr& statistics() { return statistics_; } + + private: + std::shared_ptr statistics_; +}; + +TEST_F(DBMultiGetAsyncIOTest, GetFromL0) { + // All 3 keys in L0. The L0 files should be read serially. + std::vector key_strs{Key(0), Key(40), Key(80)}; + std::vector keys{key_strs[0], key_strs[1], key_strs[2]}; + std::vector values(key_strs.size()); + std::vector statuses(key_strs.size()); + + ReadOptions ro; + ro.async_io = true; + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 3); + ASSERT_OK(statuses[0]); + ASSERT_OK(statuses[1]); + ASSERT_OK(statuses[2]); + ASSERT_EQ(values[0], "val_l0_" + std::to_string(0)); + ASSERT_EQ(values[1], "val_l0_" + std::to_string(40)); + ASSERT_EQ(values[2], "val_l0_" + std::to_string(80)); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // No async IO in this case since we don't do parallel lookup in L0 + ASSERT_EQ(multiget_io_batch_size.count, 0); + ASSERT_EQ(multiget_io_batch_size.max, 0); +} + +TEST_F(DBMultiGetAsyncIOTest, GetFromL1) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + key_strs.push_back(Key(33)); + key_strs.push_back(Key(54)); + key_strs.push_back(Key(102)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + ReadOptions ro; + ro.async_io = true; + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(statuses[2], Status::OK()); + ASSERT_EQ(values[0], "val_l1_" + std::to_string(33)); + ASSERT_EQ(values[1], "val_l1_" + std::to_string(54)); + ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // A batch of 3 async IOs is expected, one for each overlapping file in L1 + ASSERT_EQ(multiget_io_batch_size.count, 1); + ASSERT_EQ(multiget_io_batch_size.max, 3); +} + +TEST_F(DBMultiGetAsyncIOTest, LastKeyInFile) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + // 21 is the last key in the first L1 file + key_strs.push_back(Key(21)); + key_strs.push_back(Key(54)); + key_strs.push_back(Key(102)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + ReadOptions ro; + ro.async_io = true; + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(statuses[2], Status::OK()); + ASSERT_EQ(values[0], "val_l1_" + std::to_string(21)); + ASSERT_EQ(values[1], "val_l1_" + std::to_string(54)); + ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // Since the first MultiGet key is the last key in a file, the MultiGet is + // expected to lookup in that file first, before moving on to other files. + // So the first file lookup will issue one async read, and the next lookup + // will lookup 2 files in parallel and issue 2 async reads + ASSERT_EQ(multiget_io_batch_size.count, 2); + ASSERT_EQ(multiget_io_batch_size.max, 2); +} + +TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) { + std::vector key_strs; + std::vector keys; + std::vector values; + std::vector statuses; + + // 33 and 102 are in L1, and 56 is in L2 + key_strs.push_back(Key(33)); + key_strs.push_back(Key(56)); + key_strs.push_back(Key(102)); + keys.push_back(key_strs[0]); + keys.push_back(key_strs[1]); + keys.push_back(key_strs[2]); + values.resize(keys.size()); + statuses.resize(keys.size()); + + ReadOptions ro; + ro.async_io = true; + dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(), + keys.data(), values.data(), statuses.data()); + ASSERT_EQ(values.size(), 3); + ASSERT_EQ(statuses[0], Status::OK()); + ASSERT_EQ(statuses[1], Status::OK()); + ASSERT_EQ(statuses[2], Status::OK()); + ASSERT_EQ(values[0], "val_l1_" + std::to_string(33)); + ASSERT_EQ(values[1], "val_l2_" + std::to_string(56)); + ASSERT_EQ(values[2], "val_l1_" + std::to_string(102)); + + HistogramData multiget_io_batch_size; + + statistics()->histogramData(MULTIGET_IO_BATCH_SIZE, &multiget_io_batch_size); + + // There is only one MultiGet key in the bottommost level - 56. Thus + // the bottommost level will not use async IO. + ASSERT_EQ(multiget_io_batch_size.count, 1); + ASSERT_EQ(multiget_io_batch_size.max, 2); +} +#endif // USE_COROUTINES + TEST_F(DBBasicTest, MultiGetStats) { Options options; options.create_if_missing = true; @@ -3308,6 +3644,11 @@ class DeadlineRandomAccessFile : public FSRandomAccessFileOwnerWrapper { IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, IODebugContext* dbg) override; + IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, + std::function cb, + void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, + IODebugContext* dbg) override; + private: DeadlineFS& fs_; std::unique_ptr file_; @@ -3448,6 +3789,26 @@ IOStatus DeadlineRandomAccessFile::Read(uint64_t offset, size_t len, return s; } +IOStatus DeadlineRandomAccessFile::ReadAsync( + FSReadRequest& req, const IOOptions& opts, + std::function cb, void* cb_arg, + void** io_handle, IOHandleDeleter* del_fn, IODebugContext* dbg) { + const std::chrono::microseconds deadline = fs_.GetDeadline(); + const std::chrono::microseconds io_timeout = fs_.GetIOTimeout(); + IOStatus s; + if (deadline.count() || io_timeout.count()) { + fs_.AssertDeadline(deadline, io_timeout, opts); + } + if (s.ok()) { + s = FSRandomAccessFileWrapper::ReadAsync(req, opts, cb, cb_arg, io_handle, + del_fn, dbg); + } + if (s.ok()) { + s = fs_.ShouldDelay(opts); + } + return s; +} + IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, @@ -3469,7 +3830,8 @@ IOStatus DeadlineRandomAccessFile::MultiRead(FSReadRequest* reqs, // A test class for intercepting random reads and injecting artificial // delays. Used for testing the MultiGet deadline feature -class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { +class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet, + public testing::WithParamInterface { public: DBBasicTestMultiGetDeadline() : DBBasicTestMultiGet( @@ -3492,7 +3854,13 @@ class DBBasicTestMultiGetDeadline : public DBBasicTestMultiGet { } }; -TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { +TEST_P(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { +#ifndef USE_COROUTINES + if (GetParam()) { + ROCKSDB_GTEST_SKIP("This test requires coroutine support"); + return; + } +#endif // USE_COROUTINES std::shared_ptr fs = std::make_shared(env_, false); std::unique_ptr env(new CompositeEnvWrapper(env_, fs)); Options options = CurrentOptions(); @@ -3523,6 +3891,7 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { ReadOptions ro; ro.deadline = std::chrono::microseconds{env->NowMicros() + 10000}; + ro.async_io = GetParam(); // Delay the first IO fs->SetDelayTrigger(ro.deadline, ro.io_timeout, 0); @@ -3625,6 +3994,9 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) { Close(); } +INSTANTIATE_TEST_CASE_P(DeadlineIO, DBBasicTestMultiGetDeadline, + ::testing::Bool()); + TEST_F(DBBasicTest, ManifestWriteFailure) { Options options = GetDefaultOptions(); options.create_if_missing = true; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 62589ba1d7..62853380fa 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2590,7 +2590,8 @@ Status DBImpl::MultiGetImpl( ? MultiGetContext::MAX_BATCH_SIZE : keys_left; MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left, - batch_size, snapshot, read_options); + batch_size, snapshot, read_options, GetFileSystem(), + stats_); MultiGetRange range = ctx.GetMultiGetRange(); range.AddValueSize(curr_value_size); bool lookup_current = false; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 47bfd84216..e4099f10fa 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -830,10 +830,12 @@ std::string DBTestBase::Get(int cf, const std::string& k, std::vector DBTestBase::MultiGet(std::vector cfs, const std::vector& k, const Snapshot* snapshot, - const bool batched) { + const bool batched, + const bool async) { ReadOptions options; options.verify_checksums = true; options.snapshot = snapshot; + options.async_io = async; std::vector handles; std::vector keys; std::vector result; @@ -875,10 +877,12 @@ std::vector DBTestBase::MultiGet(std::vector cfs, } std::vector DBTestBase::MultiGet(const std::vector& k, - const Snapshot* snapshot) { + const Snapshot* snapshot, + const bool async) { ReadOptions options; options.verify_checksums = true; options.snapshot = snapshot; + options.async_io = async; std::vector keys; std::vector result(k.size()); std::vector statuses(k.size()); diff --git a/db/db_test_util.h b/db/db_test_util.h index dd1bd25d5d..3572a5d584 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1199,10 +1199,12 @@ class DBTestBase : public testing::Test { std::vector MultiGet(std::vector cfs, const std::vector& k, const Snapshot* snapshot, - const bool batched); + const bool batched, + const bool async = false); std::vector MultiGet(const std::vector& k, - const Snapshot* snapshot = nullptr); + const Snapshot* snapshot = nullptr, + const bool async = false); uint64_t GetNumSnapshots(); diff --git a/db/table_cache.cc b/db/table_cache.cc index 283cd77774..e121e66b2d 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -32,14 +32,30 @@ #include "util/stop_watch.h" namespace ROCKSDB_NAMESPACE { - namespace { - template static void DeleteEntry(const Slice& /*key*/, void* value) { T* typed_value = reinterpret_cast(value); delete typed_value; } +} // namespace +} // namespace ROCKSDB_NAMESPACE + +// Generate the regular and coroutine versions of some methods by +// including table_cache_sync_and_async.h twice +// Macros in the header will expand differently based on whether +// WITH_COROUTINES or WITHOUT_COROUTINES is defined +// clang-format off +#define WITHOUT_COROUTINES +#include "db/table_cache_sync_and_async.h" +#undef WITHOUT_COROUTINES +#define WITH_COROUTINES +#include "db/table_cache_sync_and_async.h" +// clang-format on + +namespace ROCKSDB_NAMESPACE { + +namespace { static void UnrefEntry(void* arg1, void* arg2) { Cache* cache = reinterpret_cast(arg1); @@ -484,131 +500,6 @@ Status TableCache::Get( return s; } -// Batched version of TableCache::MultiGet. -Status TableCache::MultiGet( - const ReadOptions& options, - const InternalKeyComparator& internal_comparator, - const FileMetaData& file_meta, const MultiGetContext::Range* mget_range, - const std::shared_ptr& prefix_extractor, - HistogramImpl* file_read_hist, bool skip_filters, int level) { - auto& fd = file_meta.fd; - Status s; - TableReader* t = fd.table_reader; - Cache::Handle* handle = nullptr; - MultiGetRange table_range(*mget_range, mget_range->begin(), - mget_range->end()); -#ifndef ROCKSDB_LITE - autovector row_cache_entries; - IterKey row_cache_key; - size_t row_cache_key_prefix_size = 0; - KeyContext& first_key = *table_range.begin(); - bool lookup_row_cache = - ioptions_.row_cache && !first_key.get_context->NeedToReadSequence(); - - // Check row cache if enabled. Since row cache does not currently store - // sequence numbers, we cannot use it if we need to fetch the sequence. - if (lookup_row_cache) { - GetContext* first_context = first_key.get_context; - CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context, - row_cache_key); - row_cache_key_prefix_size = row_cache_key.Size(); - - for (auto miter = table_range.begin(); miter != table_range.end(); - ++miter) { - const Slice& user_key = miter->ukey_with_ts; - - GetContext* get_context = miter->get_context; - - if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size, - get_context)) { - table_range.SkipKey(miter); - } else { - row_cache_entries.emplace_back(); - get_context->SetReplayLog(&(row_cache_entries.back())); - } - } - } -#endif // ROCKSDB_LITE - - // Check that table_range is not empty. Its possible all keys may have been - // found in the row cache and thus the range may now be empty - if (s.ok() && !table_range.empty()) { - if (t == nullptr) { - s = FindTable(options, file_options_, internal_comparator, fd, &handle, - prefix_extractor, - options.read_tier == kBlockCacheTier /* no_io */, - true /* record_read_stats */, file_read_hist, skip_filters, - level, true /* prefetch_index_and_filter_in_cache */, - 0 /*max_file_size_for_l0_meta_pin*/, file_meta.temperature); - TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s); - if (s.ok()) { - t = GetTableReaderFromHandle(handle); - assert(t); - } - } - if (s.ok() && !options.ignore_range_deletions) { - std::unique_ptr range_del_iter( - t->NewRangeTombstoneIterator(options)); - if (range_del_iter != nullptr) { - for (auto iter = table_range.begin(); iter != table_range.end(); - ++iter) { - SequenceNumber* max_covering_tombstone_seq = - iter->get_context->max_covering_tombstone_seq(); - *max_covering_tombstone_seq = std::max( - *max_covering_tombstone_seq, - range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts)); - } - } - } - if (s.ok()) { - t->MultiGet(options, &table_range, prefix_extractor.get(), skip_filters); - } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { - for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) { - Status* status = iter->s; - if (status->IsIncomplete()) { - // Couldn't find Table in cache but treat as kFound if no_io set - iter->get_context->MarkKeyMayExist(); - s = Status::OK(); - } - } - } - } - -#ifndef ROCKSDB_LITE - if (lookup_row_cache) { - size_t row_idx = 0; - - for (auto miter = table_range.begin(); miter != table_range.end(); - ++miter) { - std::string& row_cache_entry = row_cache_entries[row_idx++]; - const Slice& user_key = miter->ukey_with_ts; - ; - GetContext* get_context = miter->get_context; - - get_context->SetReplayLog(nullptr); - // Compute row cache key. - row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(), - user_key.size()); - // Put the replay log in row cache only if something was found. - if (s.ok() && !row_cache_entry.empty()) { - size_t charge = row_cache_entry.capacity() + sizeof(std::string); - void* row_ptr = new std::string(std::move(row_cache_entry)); - // If row cache is full, it's OK. - ioptions_.row_cache - ->Insert(row_cache_key.GetUserKey(), row_ptr, charge, - &DeleteEntry) - .PermitUncheckedError(); - } - } - } -#endif // ROCKSDB_LITE - - if (handle != nullptr) { - ReleaseHandle(handle); - } - return s; -} - Status TableCache::GetTableProperties( const FileOptions& file_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, diff --git a/db/table_cache.h b/db/table_cache.h index fce50775ba..dc5c7a21e9 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -24,6 +24,7 @@ #include "rocksdb/table.h" #include "table/table_reader.h" #include "trace_replay/block_cache_tracer.h" +#include "util/coro_utils.h" namespace ROCKSDB_NAMESPACE { @@ -115,8 +116,8 @@ class TableCache { // in the embedded GetContext // @param skip_filters Disables loading/accessing the filter block // @param level The level this table is at, -1 for "not set / don't know" - Status MultiGet( - const ReadOptions& options, + DECLARE_SYNC_AND_ASYNC( + Status, MultiGet, const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, const MultiGetContext::Range* mget_range, const std::shared_ptr& prefix_extractor = nullptr, diff --git a/db/table_cache_sync_and_async.h b/db/table_cache_sync_and_async.h new file mode 100644 index 0000000000..c6ae5f9b72 --- /dev/null +++ b/db/table_cache_sync_and_async.h @@ -0,0 +1,140 @@ +// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/coro_utils.h" + +#if defined(WITHOUT_COROUTINES) || \ + (defined(USE_COROUTINES) && defined(WITH_COROUTINES)) +namespace ROCKSDB_NAMESPACE { + +#if defined(WITHOUT_COROUTINES) +#endif + +// Batched version of TableCache::MultiGet. +DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet) +(const ReadOptions& options, const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, const MultiGetContext::Range* mget_range, + const std::shared_ptr& prefix_extractor, + HistogramImpl* file_read_hist, bool skip_filters, int level) { + auto& fd = file_meta.fd; + Status s; + TableReader* t = fd.table_reader; + Cache::Handle* handle = nullptr; + MultiGetRange table_range(*mget_range, mget_range->begin(), + mget_range->end()); +#ifndef ROCKSDB_LITE + autovector row_cache_entries; + IterKey row_cache_key; + size_t row_cache_key_prefix_size = 0; + KeyContext& first_key = *table_range.begin(); + bool lookup_row_cache = + ioptions_.row_cache && !first_key.get_context->NeedToReadSequence(); + + // Check row cache if enabled. Since row cache does not currently store + // sequence numbers, we cannot use it if we need to fetch the sequence. + if (lookup_row_cache) { + GetContext* first_context = first_key.get_context; + CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context, + row_cache_key); + row_cache_key_prefix_size = row_cache_key.Size(); + + for (auto miter = table_range.begin(); miter != table_range.end(); + ++miter) { + const Slice& user_key = miter->ukey_with_ts; + + GetContext* get_context = miter->get_context; + + if (GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size, + get_context)) { + table_range.SkipKey(miter); + } else { + row_cache_entries.emplace_back(); + get_context->SetReplayLog(&(row_cache_entries.back())); + } + } + } +#endif // ROCKSDB_LITE + + // Check that table_range is not empty. Its possible all keys may have been + // found in the row cache and thus the range may now be empty + if (s.ok() && !table_range.empty()) { + if (t == nullptr) { + s = FindTable(options, file_options_, internal_comparator, fd, &handle, + prefix_extractor, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record_read_stats */, file_read_hist, skip_filters, + level, true /* prefetch_index_and_filter_in_cache */, + 0 /*max_file_size_for_l0_meta_pin*/, file_meta.temperature); + TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s); + if (s.ok()) { + t = GetTableReaderFromHandle(handle); + assert(t); + } + } + if (s.ok() && !options.ignore_range_deletions) { + std::unique_ptr range_del_iter( + t->NewRangeTombstoneIterator(options)); + if (range_del_iter != nullptr) { + for (auto iter = table_range.begin(); iter != table_range.end(); + ++iter) { + SequenceNumber* max_covering_tombstone_seq = + iter->get_context->max_covering_tombstone_seq(); + *max_covering_tombstone_seq = std::max( + *max_covering_tombstone_seq, + range_del_iter->MaxCoveringTombstoneSeqnum(iter->ukey_with_ts)); + } + } + } + if (s.ok()) { + CO_AWAIT(t->MultiGet) + (options, &table_range, prefix_extractor.get(), skip_filters); + } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { + for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) { + Status* status = iter->s; + if (status->IsIncomplete()) { + // Couldn't find Table in cache but treat as kFound if no_io set + iter->get_context->MarkKeyMayExist(); + s = Status::OK(); + } + } + } + } + +#ifndef ROCKSDB_LITE + if (lookup_row_cache) { + size_t row_idx = 0; + + for (auto miter = table_range.begin(); miter != table_range.end(); + ++miter) { + std::string& row_cache_entry = row_cache_entries[row_idx++]; + const Slice& user_key = miter->ukey_with_ts; + ; + GetContext* get_context = miter->get_context; + + get_context->SetReplayLog(nullptr); + // Compute row cache key. + row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(), + user_key.size()); + // Put the replay log in row cache only if something was found. + if (s.ok() && !row_cache_entry.empty()) { + size_t charge = row_cache_entry.capacity() + sizeof(std::string); + void* row_ptr = new std::string(std::move(row_cache_entry)); + // If row cache is full, it's OK. + ioptions_.row_cache + ->Insert(row_cache_key.GetUserKey(), row_ptr, charge, + &DeleteEntry) + .PermitUncheckedError(); + } + } + } +#endif // ROCKSDB_LITE + + if (handle != nullptr) { + ReleaseHandle(handle); + } + CO_RETURN s; +} +} // namespace ROCKSDB_NAMESPACE +#endif diff --git a/db/version_set.cc b/db/version_set.cc index 34fcaf1fed..f9bfd66d4f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -38,6 +38,10 @@ #include "db/table_cache.h" #include "db/version_builder.h" #include "db/version_edit_handler.h" +#if USE_COROUTINES +#include "folly/experimental/coro/BlockingWait.h" +#include "folly/experimental/coro/Collect.h" +#endif #include "file/filename.h" #include "file/random_access_file_reader.h" #include "file/read_write_util.h" @@ -63,10 +67,23 @@ #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/coding.h" +#include "util/coro_utils.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/user_comparator_wrapper.h" +// Generate the regular and coroutine versions of some methods by +// including version_set_sync_and_async.h twice +// Macros in the header will expand differently based on whether +// WITH_COROUTINES or WITHOUT_COROUTINES is defined +// clang-format off +#define WITHOUT_COROUTINES +#include "db/version_set_sync_and_async.h" +#undef WITHOUT_COROUTINES +#define WITH_COROUTINES +#include "db/version_set_sync_and_async.h" +// clang-format on + namespace ROCKSDB_NAMESPACE { namespace { @@ -505,68 +522,63 @@ class FilePickerMultiGet { return file_hit; } - FdWithKeyRange* GetNextFile() { - while (!search_ended_) { - // Start searching next level. - if (batch_iter_ == current_level_range_.end()) { - search_ended_ = !PrepareNextLevel(); - continue; - } else { - if (maybe_repeat_key_) { - maybe_repeat_key_ = false; - // Check if we found the final value for the last key in the - // previous lookup range. If we did, then there's no need to look - // any further for that key, so advance batch_iter_. Else, keep - // batch_iter_ positioned on that key so we look it up again in - // the next file - // For L0, always advance the key because we will look in the next - // file regardless for all keys not found yet - if (current_level_range_.CheckKeyDone(batch_iter_) || - curr_level_ == 0) { - batch_iter_ = upper_key_; - } - } - // batch_iter_prev_ will become the start key for the next file - // lookup - batch_iter_prev_ = batch_iter_; - } + void PrepareNextLevelForSearch() { search_ended_ = !PrepareNextLevel(); } - MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, - current_level_range_.end()); - size_t curr_file_index = - (batch_iter_ != current_level_range_.end()) - ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level - : curr_file_level_->num_files; - FdWithKeyRange* f; - bool is_last_key_in_file; - if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, - &is_last_key_in_file)) { - search_ended_ = !PrepareNextLevel(); - } else { - if (is_last_key_in_file) { - // Since cmp_largest is 0, batch_iter_ still points to the last key - // that falls in this file, instead of the next one. Increment - // the file index for all keys between batch_iter_ and upper_key_ - auto tmp_iter = batch_iter_; - while (tmp_iter != upper_key_) { - ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level); - ++tmp_iter; - } - maybe_repeat_key_ = true; + FdWithKeyRange* GetNextFileInLevel() { + if (batch_iter_ == current_level_range_.end() || search_ended_) { + return nullptr; + } else { + if (maybe_repeat_key_) { + maybe_repeat_key_ = false; + // Check if we found the final value for the last key in the + // previous lookup range. If we did, then there's no need to look + // any further for that key, so advance batch_iter_. Else, keep + // batch_iter_ positioned on that key so we look it up again in + // the next file + // For L0, always advance the key because we will look in the next + // file regardless for all keys not found yet + if (current_level_range_.CheckKeyDone(batch_iter_) || + curr_level_ == 0) { + batch_iter_ = upper_key_; } - // Set the range for this file - current_file_range_ = - MultiGetRange(next_file_range, batch_iter_prev_, upper_key_); - returned_file_level_ = curr_level_; - hit_file_level_ = curr_level_; - is_hit_file_last_in_level_ = - curr_file_index == curr_file_level_->num_files - 1; - return f; } + // batch_iter_prev_ will become the start key for the next file + // lookup + batch_iter_prev_ = batch_iter_; } - // Search ended - return nullptr; + MultiGetRange next_file_range(current_level_range_, batch_iter_prev_, + current_level_range_.end()); + size_t curr_file_index = + (batch_iter_ != current_level_range_.end()) + ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level + : curr_file_level_->num_files; + FdWithKeyRange* f; + bool is_last_key_in_file; + if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f, + &is_last_key_in_file)) { + return nullptr; + } else { + if (is_last_key_in_file) { + // Since cmp_largest is 0, batch_iter_ still points to the last key + // that falls in this file, instead of the next one. Increment + // the file index for all keys between batch_iter_ and upper_key_ + auto tmp_iter = batch_iter_; + while (tmp_iter != upper_key_) { + ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level); + ++tmp_iter; + } + maybe_repeat_key_ = true; + } + // Set the range for this file + current_file_range_ = + MultiGetRange(next_file_range, batch_iter_prev_, upper_key_); + returned_file_level_ = curr_level_; + hit_file_level_ = curr_level_; + is_hit_file_last_in_level_ = + curr_file_index == curr_file_level_->num_files - 1; + return f; + } } // getter for current file level @@ -577,8 +589,16 @@ class FilePickerMultiGet { // GetNextFile()) is at the last index in its level. bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; } + bool KeyMaySpanNextFile() { return maybe_repeat_key_; } + + bool IsSearchEnded() { return search_ended_; } + const MultiGetRange& CurrentFileRange() { return current_file_range_; } + bool RemainingOverlapInLevel() { + return !current_level_range_.Suffix(current_file_range_).empty(); + } + private: unsigned int num_levels_; unsigned int curr_level_; @@ -2218,7 +2238,7 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, &file_picker_range, &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_, user_comparator(), internal_comparator()); - FdWithKeyRange* f = fp.GetNextFile(); + FdWithKeyRange* f = fp.GetNextFileInLevel(); Status s; uint64_t num_index_read = 0; uint64_t num_filter_read = 0; @@ -2228,164 +2248,92 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range, MultiGetRange keys_with_blobs_range(*range, range->begin(), range->end()); // blob_file => [[blob_idx, it], ...] std::unordered_map blob_rqs; - int level = -1; + int prev_level = -1; - while (f != nullptr) { - MultiGetRange file_range = fp.CurrentFileRange(); - bool timer_enabled = - GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && - get_perf_context()->per_level_perf_context_enabled; + while (!fp.IsSearchEnded()) { + // This will be set to true later if we actually look up in a file in L0. + // For per level stats purposes, an L0 file is treated as a level + bool dump_stats_for_l0_file = false; - // Report MultiGet stats per level. - if (level >= 0 && level != (int)fp.GetHitFileLevel()) { - // Dump the stats if the search has moved to the next level and - // reset for next level. - RecordInHistogram(db_statistics_, - NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, - num_index_read + num_filter_read); - RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL, - num_data_read); - RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read); - num_filter_read = 0; - num_index_read = 0; - num_data_read = 0; - num_sst_read = 0; - level = fp.GetHitFileLevel(); - } - - StopWatchNano timer(clock_, timer_enabled /* auto_start */); - s = table_cache_->MultiGet( - read_options, *internal_comparator(), *f->file_metadata, &file_range, - mutable_cf_options_.prefix_extractor, - cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), - IsFilterSkipped(static_cast(fp.GetHitFileLevel()), - fp.IsHitFileLastInLevel()), - fp.GetHitFileLevel()); - // TODO: examine the behavior for corrupted key - if (timer_enabled) { - PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), - fp.GetHitFileLevel()); - } - if (!s.ok()) { - // TODO: Set status for individual keys appropriately - for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { - *iter->s = s; - file_range.MarkKeyDone(iter); - } - return; - } - uint64_t batch_size = 0; - for (auto iter = file_range.begin(); s.ok() && iter != file_range.end(); - ++iter) { - GetContext& get_context = *iter->get_context; - Status* status = iter->s; - // The Status in the KeyContext takes precedence over GetContext state - // Status may be an error if there were any IO errors in the table - // reader. We never expect Status to be NotFound(), as that is - // determined by get_context - assert(!status->IsNotFound()); - if (!status->ok()) { - file_range.MarkKeyDone(iter); - continue; - } - - if (get_context.sample()) { - sample_file_read_inc(f->file_metadata); - } - batch_size++; - num_index_read += get_context.get_context_stats_.num_index_read; - num_filter_read += get_context.get_context_stats_.num_filter_read; - num_data_read += get_context.get_context_stats_.num_data_read; - num_sst_read += get_context.get_context_stats_.num_sst_read; - // Reset these stats since they're specific to a level - get_context.get_context_stats_.num_index_read = 0; - get_context.get_context_stats_.num_filter_read = 0; - get_context.get_context_stats_.num_data_read = 0; - get_context.get_context_stats_.num_sst_read = 0; - - // report the counters before returning - if (get_context.State() != GetContext::kNotFound && - get_context.State() != GetContext::kMerge && - db_statistics_ != nullptr) { - get_context.ReportCounters(); - } else { - if (iter->max_covering_tombstone_seq > 0) { - // The remaining files we look at will only contain covered keys, so - // we stop here for this key - file_picker_range.SkipKey(iter); + // Avoid using the coroutine version if we're looking in a L0 file, since + // L0 files won't be parallelized anyway. The regular synchronous version + // is faster. + if (!read_options.async_io || !using_coroutines() || + fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) { + if (f) { + // Call MultiGetFromSST for looking up a single file + s = MultiGetFromSST(read_options, fp.CurrentFileRange(), + fp.GetHitFileLevel(), fp.IsHitFileLastInLevel(), f, + blob_rqs, num_filter_read, num_index_read, + num_data_read, num_sst_read); + if (fp.GetHitFileLevel() == 0) { + dump_stats_for_l0_file = true; } } - switch (get_context.State()) { - case GetContext::kNotFound: - // Keep searching in other files - break; - case GetContext::kMerge: - // TODO: update per-level perfcontext user_key_return_count for kMerge - break; - case GetContext::kFound: - if (fp.GetHitFileLevel() == 0) { - RecordTick(db_statistics_, GET_HIT_L0); - } else if (fp.GetHitFileLevel() == 1) { - RecordTick(db_statistics_, GET_HIT_L1); - } else if (fp.GetHitFileLevel() >= 2) { - RecordTick(db_statistics_, GET_HIT_L2_AND_UP); - } - - PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, - fp.GetHitFileLevel()); - - file_range.MarkKeyDone(iter); - - if (iter->is_blob_index) { - if (iter->value) { - TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex", - &(*iter)); - - const Slice& blob_index_slice = *(iter->value); - BlobIndex blob_index; - Status tmp_s = blob_index.DecodeFrom(blob_index_slice); - if (tmp_s.ok()) { - const uint64_t blob_file_num = blob_index.file_number(); - blob_rqs[blob_file_num].emplace_back( - std::make_pair(blob_index, std::cref(*iter))); - } else { - *(iter->s) = tmp_s; - } - } - } else { - file_range.AddValueSize(iter->value->size()); - if (file_range.GetValueSize() > - read_options.value_size_soft_limit) { - s = Status::Aborted(); - break; - } - } - continue; - case GetContext::kDeleted: - // Use empty error message for speed - *status = Status::NotFound(); - file_range.MarkKeyDone(iter); - continue; - case GetContext::kCorrupt: - *status = - Status::Corruption("corrupted key for ", iter->lkey->user_key()); - file_range.MarkKeyDone(iter); - continue; - case GetContext::kUnexpectedBlobIndex: - ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); - *status = Status::NotSupported( - "Encounter unexpected blob index. Please open DB with " - "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); - file_range.MarkKeyDone(iter); - continue; + if (s.ok()) { + f = fp.GetNextFileInLevel(); } - } +#if USE_COROUTINES + } else { + std::vector> mget_tasks; + while (f != nullptr) { + mget_tasks.emplace_back(MultiGetFromSSTCoroutine( + read_options, fp.CurrentFileRange(), fp.GetHitFileLevel(), + fp.IsHitFileLastInLevel(), f, blob_rqs, num_filter_read, + num_index_read, num_data_read, num_sst_read)); + if (fp.KeyMaySpanNextFile()) { + break; + } + f = fp.GetNextFileInLevel(); + } + if (mget_tasks.size() > 0) { + // Collect all results so far + std::vector statuses = folly::coro::blockingWait( + folly::coro::collectAllRange(std::move(mget_tasks)) + .scheduleOn(&range->context()->executor())); + for (Status stat : statuses) { + if (!stat.ok()) { + s = stat; + } + } - RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size); + if (s.ok() && fp.KeyMaySpanNextFile()) { + f = fp.GetNextFileInLevel(); + } + } +#endif // USE_COROUTINES + } + // If bad status or we found final result for all the keys if (!s.ok() || file_picker_range.empty()) { break; } - f = fp.GetNextFile(); + if (!f) { + // Reached the end of this level. Prepare the next level + fp.PrepareNextLevelForSearch(); + if (!fp.IsSearchEnded()) { + // Its possible there is no overlap on this level and f is nullptr + f = fp.GetNextFileInLevel(); + } + if (dump_stats_for_l0_file || + (prev_level != 0 && prev_level != (int)fp.GetHitFileLevel())) { + // Dump the stats if the search has moved to the next level and + // reset for next level. + if (num_sst_read || (num_filter_read + num_index_read)) { + RecordInHistogram(db_statistics_, + NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL, + num_index_read + num_filter_read); + RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL, + num_data_read); + RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, + num_sst_read); + } + num_filter_read = 0; + num_index_read = 0; + num_data_read = 0; + num_sst_read = 0; + } + prev_level = fp.GetHitFileLevel(); + } } // Dump stats for most recent level diff --git a/db/version_set.h b/db/version_set.h index 9a486f8956..d2fd8ddb5d 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -54,6 +54,7 @@ #include "table/get_context.h" #include "table/multiget_context.h" #include "trace_replay/block_cache_tracer.h" +#include "util/coro_utils.h" #include "util/hash_containers.h" namespace ROCKSDB_NAMESPACE { @@ -884,6 +885,14 @@ class Version { // This accumulated stats will be used in compaction. void UpdateAccumulatedStats(); + DECLARE_SYNC_AND_ASYNC( + /* ret_type */ Status, /* func_name */ MultiGetFromSST, + const ReadOptions& read_options, MultiGetRange file_range, + int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f, + std::unordered_map& blob_rqs, + uint64_t& num_filter_read, uint64_t& num_index_read, + uint64_t& num_data_read, uint64_t& num_sst_read); + ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs Logger* info_log_; Statistics* db_statistics_; diff --git a/db/version_set_sync_and_async.h b/db/version_set_sync_and_async.h new file mode 100644 index 0000000000..9dd50d21b8 --- /dev/null +++ b/db/version_set_sync_and_async.h @@ -0,0 +1,154 @@ +// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/coro_utils.h" + +#if defined(WITHOUT_COROUTINES) || \ + (defined(USE_COROUTINES) && defined(WITH_COROUTINES)) + +namespace ROCKSDB_NAMESPACE { + +// Lookup a batch of keys in a single SST file +DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) +(const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level, + bool is_hit_file_last_in_level, FdWithKeyRange* f, + std::unordered_map& blob_rqs, + uint64_t& num_filter_read, uint64_t& num_index_read, uint64_t& num_data_read, + uint64_t& num_sst_read) { + bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && + get_perf_context()->per_level_perf_context_enabled; + + Status s; + StopWatchNano timer(clock_, timer_enabled /* auto_start */); + s = CO_AWAIT(table_cache_->MultiGet)( + read_options, *internal_comparator(), *f->file_metadata, &file_range, + mutable_cf_options_.prefix_extractor, + cfd_->internal_stats()->GetFileReadHist(hit_file_level), + IsFilterSkipped(static_cast(hit_file_level), + is_hit_file_last_in_level), + hit_file_level); + // TODO: examine the behavior for corrupted key + if (timer_enabled) { + PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), + hit_file_level); + } + if (!s.ok()) { + // TODO: Set status for individual keys appropriately + for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { + *iter->s = s; + file_range.MarkKeyDone(iter); + } + CO_RETURN s; + } + uint64_t batch_size = 0; + for (auto iter = file_range.begin(); s.ok() && iter != file_range.end(); + ++iter) { + GetContext& get_context = *iter->get_context; + Status* status = iter->s; + // The Status in the KeyContext takes precedence over GetContext state + // Status may be an error if there were any IO errors in the table + // reader. We never expect Status to be NotFound(), as that is + // determined by get_context + assert(!status->IsNotFound()); + if (!status->ok()) { + file_range.MarkKeyDone(iter); + continue; + } + + if (get_context.sample()) { + sample_file_read_inc(f->file_metadata); + } + batch_size++; + num_index_read += get_context.get_context_stats_.num_index_read; + num_filter_read += get_context.get_context_stats_.num_filter_read; + num_data_read += get_context.get_context_stats_.num_data_read; + num_sst_read += get_context.get_context_stats_.num_sst_read; + // Reset these stats since they're specific to a level + get_context.get_context_stats_.num_index_read = 0; + get_context.get_context_stats_.num_filter_read = 0; + get_context.get_context_stats_.num_data_read = 0; + get_context.get_context_stats_.num_sst_read = 0; + + // report the counters before returning + if (get_context.State() != GetContext::kNotFound && + get_context.State() != GetContext::kMerge && + db_statistics_ != nullptr) { + get_context.ReportCounters(); + } else { + if (iter->max_covering_tombstone_seq > 0) { + // The remaining files we look at will only contain covered keys, so + // we stop here for this key + file_range.SkipKey(iter); + } + } + switch (get_context.State()) { + case GetContext::kNotFound: + // Keep searching in other files + break; + case GetContext::kMerge: + // TODO: update per-level perfcontext user_key_return_count for kMerge + break; + case GetContext::kFound: + if (hit_file_level == 0) { + RecordTick(db_statistics_, GET_HIT_L0); + } else if (hit_file_level == 1) { + RecordTick(db_statistics_, GET_HIT_L1); + } else if (hit_file_level >= 2) { + RecordTick(db_statistics_, GET_HIT_L2_AND_UP); + } + + PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level); + + file_range.MarkKeyDone(iter); + + if (iter->is_blob_index) { + if (iter->value) { + TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex", + &(*iter)); + + const Slice& blob_index_slice = *(iter->value); + BlobIndex blob_index; + Status tmp_s = blob_index.DecodeFrom(blob_index_slice); + if (tmp_s.ok()) { + const uint64_t blob_file_num = blob_index.file_number(); + blob_rqs[blob_file_num].emplace_back( + std::make_pair(blob_index, std::cref(*iter))); + } else { + *(iter->s) = tmp_s; + } + } + } else { + file_range.AddValueSize(iter->value->size()); + if (file_range.GetValueSize() > read_options.value_size_soft_limit) { + s = Status::Aborted(); + break; + } + } + continue; + case GetContext::kDeleted: + // Use empty error message for speed + *status = Status::NotFound(); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kCorrupt: + *status = + Status::Corruption("corrupted key for ", iter->lkey->user_key()); + file_range.MarkKeyDone(iter); + continue; + case GetContext::kUnexpectedBlobIndex: + ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); + *status = Status::NotSupported( + "Encounter unexpected blob index. Please open DB with " + "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); + file_range.MarkKeyDone(iter); + continue; + } + } + + RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size); + CO_RETURN s; +} +} // namespace ROCKSDB_NAMESPACE +#endif diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 9bc7ab196d..6f9ca1da7c 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -543,6 +543,9 @@ enum Histograms : uint32_t { // Number of prefetched bytes discarded by RocksDB. PREFETCHED_BYTES_DISCARDED, + // Number of IOs issued in parallel in a MultiGet batch + MULTIGET_IO_BATCH_SIZE, + HISTOGRAM_ENUM_MAX, }; diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 4d0d55c124..314c258ebc 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -5593,6 +5593,8 @@ class HistogramTypeJni { return 0x34; case ROCKSDB_NAMESPACE::Histograms::PREFETCHED_BYTES_DISCARDED: return 0x35; + case ROCKSDB_NAMESPACE::Histograms::MULTIGET_IO_BATCH_SIZE: + return 0x36; case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX: // 0x1F for backwards compatibility on current minor version. return 0x1F; @@ -5716,6 +5718,8 @@ class HistogramTypeJni { return ROCKSDB_NAMESPACE::Histograms::POLL_WAIT_MICROS; case 0x35: return ROCKSDB_NAMESPACE::Histograms::PREFETCHED_BYTES_DISCARDED; + case 0x36: + return ROCKSDB_NAMESPACE::Histograms::MULTIGET_IO_BATCH_SIZE; case 0x1F: // 0x1F for backwards compatibility on current minor version. return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX; diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 388acaf4d9..8afffa5f6d 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -286,7 +286,7 @@ const std::vector> HistogramsNameMap = { {ASYNC_READ_BYTES, "rocksdb.async.read.bytes"}, {POLL_WAIT_MICROS, "rocksdb.poll.wait.micros"}, {PREFETCHED_BYTES_DISCARDED, "rocksdb.prefetched.bytes.discarded"}, - + {MULTIGET_IO_BATCH_SIZE, "rocksdb.multiget.io.batch.size"}, }; std::shared_ptr CreateDBStatistics() { diff --git a/src.mk b/src.mk index 747d18d2b6..6ed13512ab 100644 --- a/src.mk +++ b/src.mk @@ -211,6 +211,7 @@ LIB_SOURCES = \ trace_replay/trace_replay.cc \ trace_replay/block_cache_tracer.cc \ trace_replay/io_tracer.cc \ + util/async_file_reader.cc \ util/build_version.cc \ util/cleanable.cc \ util/coding.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 4d53032a4d..3f3b5c4818 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -74,6 +74,30 @@ #include "util/stop_watch.h" #include "util/string_util.h" +namespace ROCKSDB_NAMESPACE { +namespace { + +CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) { + CacheAllocationPtr heap_buf; + heap_buf = AllocateBlock(buf.size(), allocator); + memcpy(heap_buf.get(), buf.data(), buf.size()); + return heap_buf; +} +} // namespace +} // namespace ROCKSDB_NAMESPACE + +// Generate the regular and coroutine versions of some methods by +// including block_based_table_reader_sync_and_async.h twice +// Macros in the header will expand differently based on whether +// WITH_COROUTINES or WITHOUT_COROUTINES is defined +// clang-format off +#define WITHOUT_COROUTINES +#include "table/block_based/block_based_table_reader_sync_and_async.h" +#undef WITHOUT_COROUTINES +#define WITH_COROUTINES +#include "table/block_based/block_based_table_reader_sync_and_async.h" +// clang-format on + namespace ROCKSDB_NAMESPACE { extern const uint64_t kBlockBasedTableMagicNumber; @@ -139,12 +163,6 @@ inline bool PrefixExtractorChangedHelper( } } -CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) { - CacheAllocationPtr heap_buf; - heap_buf = AllocateBlock(buf.size(), allocator); - memcpy(heap_buf.get(), buf.data(), buf.size()); - return heap_buf; -} } // namespace void BlockBasedTable::UpdateCacheHitMetrics(BlockType block_type, @@ -1653,298 +1671,6 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( return s; } -// This function reads multiple data blocks from disk using Env::MultiRead() -// and optionally inserts them into the block cache. It uses the scratch -// buffer provided by the caller, which is contiguous. If scratch is a nullptr -// it allocates a separate buffer for each block. Typically, if the blocks -// need to be uncompressed and there is no compressed block cache, callers -// can allocate a temporary scratch buffer in order to minimize memory -// allocations. -// If options.fill_cache is true, it inserts the blocks into cache. If its -// false and scratch is non-null and the blocks are uncompressed, it copies -// the buffers to heap. In any case, the CachableEntry returned will -// own the data bytes. -// If compression is enabled and also there is no compressed block cache, -// the adjacent blocks are read out in one IO (combined read) -// batch - A MultiGetRange with only those keys with unique data blocks not -// found in cache -// handles - A vector of block handles. Some of them me be NULL handles -// scratch - An optional contiguous buffer to read compressed blocks into -void BlockBasedTable::RetrieveMultipleBlocks( - const ReadOptions& options, const MultiGetRange* batch, - const autovector* handles, - autovector* statuses, - autovector, MultiGetContext::MAX_BATCH_SIZE>* results, - char* scratch, const UncompressionDict& uncompression_dict) const { - RandomAccessFileReader* file = rep_->file.get(); - const Footer& footer = rep_->footer; - const ImmutableOptions& ioptions = rep_->ioptions; - size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit; - MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options); - - if (ioptions.allow_mmap_reads) { - size_t idx_in_batch = 0; - for (auto mget_iter = batch->begin(); mget_iter != batch->end(); - ++mget_iter, ++idx_in_batch) { - BlockCacheLookupContext lookup_data_block_context( - TableReaderCaller::kUserMultiGet); - const BlockHandle& handle = (*handles)[idx_in_batch]; - if (handle.IsNull()) { - continue; - } - - (*statuses)[idx_in_batch] = - RetrieveBlock(nullptr, options, handle, uncompression_dict, - &(*results)[idx_in_batch], BlockType::kData, - mget_iter->get_context, &lookup_data_block_context, - /* for_compaction */ false, /* use_cache */ true, - /* wait_for_cache */ true); - } - return; - } - - // In direct IO mode, blocks share the direct io buffer. - // Otherwise, blocks share the scratch buffer. - const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr; - - autovector read_reqs; - size_t buf_offset = 0; - size_t idx_in_batch = 0; - - uint64_t prev_offset = 0; - size_t prev_len = 0; - autovector req_idx_for_block; - autovector req_offset_for_block; - for (auto mget_iter = batch->begin(); mget_iter != batch->end(); - ++mget_iter, ++idx_in_batch) { - const BlockHandle& handle = (*handles)[idx_in_batch]; - if (handle.IsNull()) { - continue; - } - - size_t prev_end = static_cast(prev_offset) + prev_len; - - // If current block is adjacent to the previous one, at the same time, - // compression is enabled and there is no compressed cache, we combine - // the two block read as one. - // We don't combine block reads here in direct IO mode, because when doing - // direct IO read, the block requests will be realigned and merged when - // necessary. - if (use_shared_buffer && !file->use_direct_io() && - prev_end == handle.offset()) { - req_offset_for_block.emplace_back(prev_len); - prev_len += BlockSizeWithTrailer(handle); - } else { - // No compression or current block and previous one is not adjacent: - // Step 1, create a new request for previous blocks - if (prev_len != 0) { - FSReadRequest req; - req.offset = prev_offset; - req.len = prev_len; - if (file->use_direct_io()) { - req.scratch = nullptr; - } else if (use_shared_buffer) { - req.scratch = scratch + buf_offset; - buf_offset += req.len; - } else { - req.scratch = new char[req.len]; - } - read_reqs.emplace_back(req); - } - - // Step 2, remeber the previous block info - prev_offset = handle.offset(); - prev_len = BlockSizeWithTrailer(handle); - req_offset_for_block.emplace_back(0); - } - req_idx_for_block.emplace_back(read_reqs.size()); - - PERF_COUNTER_ADD(block_read_count, 1); - PERF_COUNTER_ADD(block_read_byte, BlockSizeWithTrailer(handle)); - } - // Handle the last block and process the pending last request - if (prev_len != 0) { - FSReadRequest req; - req.offset = prev_offset; - req.len = prev_len; - if (file->use_direct_io()) { - req.scratch = nullptr; - } else if (use_shared_buffer) { - req.scratch = scratch + buf_offset; - } else { - req.scratch = new char[req.len]; - } - read_reqs.emplace_back(req); - } - - AlignedBuf direct_io_buf; - { - IOOptions opts; - IOStatus s = file->PrepareIOOptions(options, opts); - if (s.ok()) { - s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf, - options.rate_limiter_priority); - } - if (!s.ok()) { - // Discard all the results in this batch if there is any time out - // or overall MultiRead error - for (FSReadRequest& req : read_reqs) { - req.status = s; - } - } - } - - idx_in_batch = 0; - size_t valid_batch_idx = 0; - for (auto mget_iter = batch->begin(); mget_iter != batch->end(); - ++mget_iter, ++idx_in_batch) { - const BlockHandle& handle = (*handles)[idx_in_batch]; - - if (handle.IsNull()) { - continue; - } - - assert(valid_batch_idx < req_idx_for_block.size()); - assert(valid_batch_idx < req_offset_for_block.size()); - assert(req_idx_for_block[valid_batch_idx] < read_reqs.size()); - size_t& req_idx = req_idx_for_block[valid_batch_idx]; - size_t& req_offset = req_offset_for_block[valid_batch_idx]; - valid_batch_idx++; - if (mget_iter->get_context) { - ++(mget_iter->get_context->get_context_stats_.num_data_read); - } - FSReadRequest& req = read_reqs[req_idx]; - Status s = req.status; - if (s.ok()) { - if ((req.result.size() != req.len) || - (req_offset + BlockSizeWithTrailer(handle) > req.result.size())) { - s = Status::Corruption("truncated block read from " + - rep_->file->file_name() + " offset " + - std::to_string(handle.offset()) + ", expected " + - std::to_string(req.len) + " bytes, got " + - std::to_string(req.result.size())); - } - } - - BlockContents raw_block_contents; - if (s.ok()) { - if (!use_shared_buffer) { - // We allocated a buffer for this block. Give ownership of it to - // BlockContents so it can free the memory - assert(req.result.data() == req.scratch); - assert(req.result.size() == BlockSizeWithTrailer(handle)); - assert(req_offset == 0); - std::unique_ptr raw_block(req.scratch); - raw_block_contents = BlockContents(std::move(raw_block), handle.size()); - } else { - // We used the scratch buffer or direct io buffer - // which are shared by the blocks. - // raw_block_contents does not have the ownership. - raw_block_contents = - BlockContents(Slice(req.result.data() + req_offset, handle.size())); - } -#ifndef NDEBUG - raw_block_contents.is_raw_block = true; -#endif - - if (options.verify_checksums) { - PERF_TIMER_GUARD(block_checksum_time); - const char* data = req.result.data(); - // Since the scratch might be shared, the offset of the data block in - // the buffer might not be 0. req.result.data() only point to the - // begin address of each read request, we need to add the offset - // in each read request. Checksum is stored in the block trailer, - // beyond the payload size. - s = VerifyBlockChecksum(footer.checksum_type(), data + req_offset, - handle.size(), rep_->file->file_name(), - handle.offset()); - TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s); - } - } else if (!use_shared_buffer) { - // Free the allocated scratch buffer. - delete[] req.scratch; - } - - if (s.ok()) { - // When the blocks share the same underlying buffer (scratch or direct io - // buffer), we may need to manually copy the block into heap if the raw - // block has to be inserted into a cache. That falls into th following - // cases - - // 1. Raw block is not compressed, it needs to be inserted into the - // uncompressed block cache if there is one - // 2. If the raw block is compressed, it needs to be inserted into the - // compressed block cache if there is one - // - // In all other cases, the raw block is either uncompressed into a heap - // buffer or there is no cache at all. - CompressionType compression_type = - GetBlockCompressionType(raw_block_contents); - if (use_shared_buffer && (compression_type == kNoCompression || - (compression_type != kNoCompression && - rep_->table_options.block_cache_compressed))) { - Slice raw = - Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle)); - raw_block_contents = BlockContents( - CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw), - handle.size()); -#ifndef NDEBUG - raw_block_contents.is_raw_block = true; -#endif - } - } - - if (s.ok()) { - if (options.fill_cache) { - BlockCacheLookupContext lookup_data_block_context( - TableReaderCaller::kUserMultiGet); - CachableEntry* block_entry = &(*results)[idx_in_batch]; - // MaybeReadBlockAndLoadToCache will insert into the block caches if - // necessary. Since we're passing the raw block contents, it will - // avoid looking up the block cache - s = MaybeReadBlockAndLoadToCache( - nullptr, options, handle, uncompression_dict, /*wait=*/true, - /*for_compaction=*/false, block_entry, BlockType::kData, - mget_iter->get_context, &lookup_data_block_context, - &raw_block_contents); - - // block_entry value could be null if no block cache is present, i.e - // BlockBasedTableOptions::no_block_cache is true and no compressed - // block cache is configured. In that case, fall - // through and set up the block explicitly - if (block_entry->GetValue() != nullptr) { - s.PermitUncheckedError(); - continue; - } - } - - CompressionType compression_type = - GetBlockCompressionType(raw_block_contents); - BlockContents contents; - if (compression_type != kNoCompression) { - UncompressionContext context(compression_type); - UncompressionInfo info(context, uncompression_dict, compression_type); - s = UncompressBlockContents( - info, req.result.data() + req_offset, handle.size(), &contents, - footer.format_version(), rep_->ioptions, memory_allocator); - } else { - // There are two cases here: - // 1) caller uses the shared buffer (scratch or direct io buffer); - // 2) we use the requst buffer. - // If scratch buffer or direct io buffer is used, we ensure that - // all raw blocks are copyed to the heap as single blocks. If scratch - // buffer is not used, we also have no combined read, so the raw - // block can be used directly. - contents = std::move(raw_block_contents); - } - if (s.ok()) { - (*results)[idx_in_batch].SetOwnedValue(new Block( - std::move(contents), read_amp_bytes_per_bit, ioptions.stats)); - } - } - (*statuses)[idx_in_batch] = s; - } -} - template Status BlockBasedTable::RetrieveBlock( FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro, @@ -2525,443 +2251,6 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, return s; } -using MultiGetRange = MultiGetContext::Range; -void BlockBasedTable::MultiGet(const ReadOptions& read_options, - const MultiGetRange* mget_range, - const SliceTransform* prefix_extractor, - bool skip_filters) { - if (mget_range->empty()) { - // Caller should ensure non-empty (performance bug) - assert(false); - return; // Nothing to do - } - - FilterBlockReader* const filter = - !skip_filters ? rep_->filter.get() : nullptr; - MultiGetRange sst_file_range(*mget_range, mget_range->begin(), - mget_range->end()); - - // First check the full filter - // If full filter not useful, Then go into each block - const bool no_io = read_options.read_tier == kBlockCacheTier; - uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId; - if (sst_file_range.begin()->get_context) { - tracing_mget_id = sst_file_range.begin()->get_context->get_tracing_get_id(); - } - BlockCacheLookupContext lookup_context{ - TableReaderCaller::kUserMultiGet, tracing_mget_id, - /*get_from_user_specified_snapshot=*/read_options.snapshot != nullptr}; - FullFilterKeysMayMatch(filter, &sst_file_range, no_io, prefix_extractor, - &lookup_context); - - if (!sst_file_range.empty()) { - IndexBlockIter iiter_on_stack; - // if prefix_extractor found in block differs from options, disable - // BlockPrefixIndex. Only do this check when index_type is kHashSearch. - bool need_upper_bound_check = false; - if (rep_->index_type == BlockBasedTableOptions::kHashSearch) { - need_upper_bound_check = PrefixExtractorChanged(prefix_extractor); - } - auto iiter = - NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack, - sst_file_range.begin()->get_context, &lookup_context); - std::unique_ptr> iiter_unique_ptr; - if (iiter != &iiter_on_stack) { - iiter_unique_ptr.reset(iiter); - } - - uint64_t prev_offset = std::numeric_limits::max(); - autovector block_handles; - autovector, MultiGetContext::MAX_BATCH_SIZE> results; - autovector statuses; - MultiGetContext::Mask reused_mask = 0; - char stack_buf[kMultiGetReadStackBufSize]; - std::unique_ptr block_buf; - { - MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(), - sst_file_range.end()); - std::vector cache_handles; - bool wait_for_cache_results = false; - - CachableEntry uncompression_dict; - Status uncompression_dict_status; - uncompression_dict_status.PermitUncheckedError(); - bool uncompression_dict_inited = false; - size_t total_len = 0; - ReadOptions ro = read_options; - ro.read_tier = kBlockCacheTier; - - for (auto miter = data_block_range.begin(); - miter != data_block_range.end(); ++miter) { - const Slice& key = miter->ikey; - iiter->Seek(miter->ikey); - - IndexValue v; - if (iiter->Valid()) { - v = iiter->value(); - } - if (!iiter->Valid() || - (!v.first_internal_key.empty() && !skip_filters && - UserComparatorWrapper(rep_->internal_comparator.user_comparator()) - .CompareWithoutTimestamp( - ExtractUserKey(key), - ExtractUserKey(v.first_internal_key)) < 0)) { - // The requested key falls between highest key in previous block and - // lowest key in current block. - if (!iiter->status().IsNotFound()) { - *(miter->s) = iiter->status(); - } - data_block_range.SkipKey(miter); - sst_file_range.SkipKey(miter); - continue; - } - - if (!uncompression_dict_inited && rep_->uncompression_dict_reader) { - uncompression_dict_status = - rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( - nullptr /* prefetch_buffer */, no_io, - read_options.verify_checksums, - sst_file_range.begin()->get_context, &lookup_context, - &uncompression_dict); - uncompression_dict_inited = true; - } - - if (!uncompression_dict_status.ok()) { - assert(!uncompression_dict_status.IsNotFound()); - *(miter->s) = uncompression_dict_status; - data_block_range.SkipKey(miter); - sst_file_range.SkipKey(miter); - continue; - } - - statuses.emplace_back(); - results.emplace_back(); - if (v.handle.offset() == prev_offset) { - // This key can reuse the previous block (later on). - // Mark previous as "reused" - reused_mask |= MultiGetContext::Mask{1} << (block_handles.size() - 1); - // Use null handle to indicate this one reuses same block as - // previous. - block_handles.emplace_back(BlockHandle::NullBlockHandle()); - continue; - } - // Lookup the cache for the given data block referenced by an index - // iterator value (i.e BlockHandle). If it exists in the cache, - // initialize block to the contents of the data block. - prev_offset = v.handle.offset(); - BlockHandle handle = v.handle; - BlockCacheLookupContext lookup_data_block_context( - TableReaderCaller::kUserMultiGet); - const UncompressionDict& dict = uncompression_dict.GetValue() - ? *uncompression_dict.GetValue() - : UncompressionDict::GetEmptyDict(); - Status s = RetrieveBlock( - nullptr, ro, handle, dict, &(results.back()), BlockType::kData, - miter->get_context, &lookup_data_block_context, - /* for_compaction */ false, /* use_cache */ true, - /* wait_for_cache */ false); - if (s.IsIncomplete()) { - s = Status::OK(); - } - if (s.ok() && !results.back().IsEmpty()) { - // Since we have a valid handle, check the value. If its nullptr, - // it means the cache is waiting for the final result and we're - // supposed to call WaitAll() to wait for the result. - if (results.back().GetValue() != nullptr) { - // Found it in the cache. Add NULL handle to indicate there is - // nothing to read from disk. - if (results.back().GetCacheHandle()) { - results.back().UpdateCachedValue(); - } - block_handles.emplace_back(BlockHandle::NullBlockHandle()); - } else { - // We have to wait for the cache lookup to finish in the - // background, and then we may have to read the block from disk - // anyway - assert(results.back().GetCacheHandle()); - wait_for_cache_results = true; - block_handles.emplace_back(handle); - cache_handles.emplace_back(results.back().GetCacheHandle()); - } - } else { - block_handles.emplace_back(handle); - total_len += BlockSizeWithTrailer(handle); - } - } - - if (wait_for_cache_results) { - Cache* block_cache = rep_->table_options.block_cache.get(); - block_cache->WaitAll(cache_handles); - for (size_t i = 0; i < block_handles.size(); ++i) { - // If this block was a success or failure or not needed because - // the corresponding key is in the same block as a prior key, skip - if (block_handles[i] == BlockHandle::NullBlockHandle() || - results[i].IsEmpty()) { - continue; - } - results[i].UpdateCachedValue(); - void* val = results[i].GetValue(); - if (!val) { - // The async cache lookup failed - could be due to an error - // or a false positive. We need to read the data block from - // the SST file - results[i].Reset(); - total_len += BlockSizeWithTrailer(block_handles[i]); - } else { - block_handles[i] = BlockHandle::NullBlockHandle(); - } - } - } - - if (total_len) { - char* scratch = nullptr; - const UncompressionDict& dict = uncompression_dict.GetValue() - ? *uncompression_dict.GetValue() - : UncompressionDict::GetEmptyDict(); - assert(uncompression_dict_inited || !rep_->uncompression_dict_reader); - assert(uncompression_dict_status.ok()); - // If using direct IO, then scratch is not used, so keep it nullptr. - // If the blocks need to be uncompressed and we don't need the - // compressed blocks, then we can use a contiguous block of - // memory to read in all the blocks as it will be temporary - // storage - // 1. If blocks are compressed and compressed block cache is there, - // alloc heap bufs - // 2. If blocks are uncompressed, alloc heap bufs - // 3. If blocks are compressed and no compressed block cache, use - // stack buf - if (!rep_->file->use_direct_io() && - rep_->table_options.block_cache_compressed == nullptr && - rep_->blocks_maybe_compressed) { - if (total_len <= kMultiGetReadStackBufSize) { - scratch = stack_buf; - } else { - scratch = new char[total_len]; - block_buf.reset(scratch); - } - } - RetrieveMultipleBlocks(read_options, &data_block_range, &block_handles, - &statuses, &results, scratch, dict); - if (sst_file_range.begin()->get_context) { - ++(sst_file_range.begin() - ->get_context->get_context_stats_.num_sst_read); - } - } - } - - DataBlockIter first_biter; - DataBlockIter next_biter; - size_t idx_in_batch = 0; - SharedCleanablePtr shared_cleanable; - for (auto miter = sst_file_range.begin(); miter != sst_file_range.end(); - ++miter) { - Status s; - GetContext* get_context = miter->get_context; - const Slice& key = miter->ikey; - bool matched = false; // if such user key matched a key in SST - bool done = false; - bool first_block = true; - do { - DataBlockIter* biter = nullptr; - bool reusing_prev_block; - bool later_reused; - uint64_t referenced_data_size = 0; - bool does_referenced_key_exist = false; - BlockCacheLookupContext lookup_data_block_context( - TableReaderCaller::kUserMultiGet, tracing_mget_id, - /*get_from_user_specified_snapshot=*/read_options.snapshot != - nullptr); - if (first_block) { - if (!block_handles[idx_in_batch].IsNull() || - !results[idx_in_batch].IsEmpty()) { - first_biter.Invalidate(Status::OK()); - NewDataBlockIterator( - read_options, results[idx_in_batch], &first_biter, - statuses[idx_in_batch]); - reusing_prev_block = false; - } else { - // If handler is null and result is empty, then the status is never - // set, which should be the initial value: ok(). - assert(statuses[idx_in_batch].ok()); - reusing_prev_block = true; - } - biter = &first_biter; - later_reused = - (reused_mask & (MultiGetContext::Mask{1} << idx_in_batch)) != 0; - idx_in_batch++; - } else { - IndexValue v = iiter->value(); - if (!v.first_internal_key.empty() && !skip_filters && - UserComparatorWrapper(rep_->internal_comparator.user_comparator()) - .CompareWithoutTimestamp( - ExtractUserKey(key), - ExtractUserKey(v.first_internal_key)) < 0) { - // The requested key falls between highest key in previous block and - // lowest key in current block. - break; - } - - next_biter.Invalidate(Status::OK()); - NewDataBlockIterator( - read_options, iiter->value().handle, &next_biter, - BlockType::kData, get_context, &lookup_data_block_context, - Status(), nullptr); - biter = &next_biter; - reusing_prev_block = false; - later_reused = false; - } - - if (read_options.read_tier == kBlockCacheTier && - biter->status().IsIncomplete()) { - // couldn't get block from block_cache - // Update Saver.state to Found because we are only looking for - // whether we can guarantee the key is not there when "no_io" is set - get_context->MarkKeyMayExist(); - break; - } - if (!biter->status().ok()) { - s = biter->status(); - break; - } - - bool may_exist = biter->SeekForGet(key); - if (!may_exist) { - // HashSeek cannot find the key this block and the the iter is not - // the end of the block, i.e. cannot be in the following blocks - // either. In this case, the seek_key cannot be found, so we break - // from the top level for-loop. - break; - } - - // Reusing blocks complicates pinning/Cleanable, because the cache - // entry referenced by biter can only be released once all returned - // pinned values are released. This code previously did an extra - // block_cache Ref for each reuse, but that unnecessarily increases - // block cache contention. Instead we can use a variant of shared_ptr - // to release in block cache only once. - // - // Although the biter loop below might SaveValue multiple times for - // merges, just one value_pinner suffices, as MultiGet will merge - // the operands before returning to the API user. - Cleanable* value_pinner; - if (biter->IsValuePinned()) { - if (reusing_prev_block) { - // Note that we don't yet know if the MultiGet results will need - // to pin this block, so we might wrap a block for sharing and - // still end up with 1 (or 0) pinning ref. Not ideal but OK. - // - // Here we avoid adding redundant cleanups if we didn't end up - // delegating the cleanup from last time around. - if (!biter->HasCleanups()) { - assert(shared_cleanable.get()); - if (later_reused) { - shared_cleanable.RegisterCopyWith(biter); - } else { - shared_cleanable.MoveAsCleanupTo(biter); - } - } - } else if (later_reused) { - assert(biter->HasCleanups()); - // Make the existing cleanups on `biter` sharable: - shared_cleanable.Allocate(); - // Move existing `biter` cleanup(s) to `shared_cleanable` - biter->DelegateCleanupsTo(&*shared_cleanable); - // Reference `shared_cleanable` as new cleanup for `biter` - shared_cleanable.RegisterCopyWith(biter); - } - assert(biter->HasCleanups()); - value_pinner = biter; - } else { - value_pinner = nullptr; - } - - // Call the *saver function on each entry/block until it returns false - for (; biter->Valid(); biter->Next()) { - ParsedInternalKey parsed_key; - Status pik_status = ParseInternalKey( - biter->key(), &parsed_key, false /* log_err_key */); // TODO - if (!pik_status.ok()) { - s = pik_status; - } - if (!get_context->SaveValue(parsed_key, biter->value(), &matched, - value_pinner)) { - if (get_context->State() == GetContext::GetState::kFound) { - does_referenced_key_exist = true; - referenced_data_size = - biter->key().size() + biter->value().size(); - } - done = true; - break; - } - s = biter->status(); - } - // Write the block cache access. - // XXX: There appear to be 'break' statements above that bypass this - // writing of the block cache trace record - if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() && - !reusing_prev_block) { - // Avoid making copy of block_key, cf_name, and referenced_key when - // constructing the access record. - Slice referenced_key; - if (does_referenced_key_exist) { - referenced_key = biter->key(); - } else { - referenced_key = key; - } - BlockCacheTraceRecord access_record( - rep_->ioptions.clock->NowMicros(), - /*block_key=*/"", lookup_data_block_context.block_type, - lookup_data_block_context.block_size, rep_->cf_id_for_tracing(), - /*cf_name=*/"", rep_->level_for_tracing(), - rep_->sst_number_for_tracing(), lookup_data_block_context.caller, - lookup_data_block_context.is_cache_hit, - lookup_data_block_context.no_insert, - lookup_data_block_context.get_id, - lookup_data_block_context.get_from_user_specified_snapshot, - /*referenced_key=*/"", referenced_data_size, - lookup_data_block_context.num_keys_in_block, - does_referenced_key_exist); - // TODO: Should handle status here? - block_cache_tracer_ - ->WriteBlockAccess(access_record, - lookup_data_block_context.block_key, - rep_->cf_name_for_tracing(), referenced_key) - .PermitUncheckedError(); - } - s = biter->status(); - if (done) { - // Avoid the extra Next which is expensive in two-level indexes - break; - } - if (first_block) { - iiter->Seek(key); - if (!iiter->Valid()) { - break; - } - } - first_block = false; - iiter->Next(); - } while (iiter->Valid()); - - if (matched && filter != nullptr && !filter->IsBlockBased()) { - RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE); - PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1, - rep_->level); - } - if (s.ok() && !iiter->status().IsNotFound()) { - s = iiter->status(); - } - *(miter->s) = s; - } -#ifdef ROCKSDB_ASSERT_STATUS_CHECKED - // Not sure why we need to do it. Should investigate more. - for (auto& st : statuses) { - st.PermitUncheckedError(); - } -#endif // ROCKSDB_ASSERT_STATUS_CHECKED - } -} - Status BlockBasedTable::Prefetch(const Slice* const begin, const Slice* const end) { auto& comparator = rep_->internal_comparator; diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index c9c656c3e0..385130d290 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -31,6 +31,7 @@ #include "table/table_reader.h" #include "table/two_level_iterator.h" #include "trace_replay/block_cache_tracer.h" +#include "util/coro_utils.h" #include "util/hash_containers.h" namespace ROCKSDB_NAMESPACE { @@ -141,10 +142,11 @@ class BlockBasedTable : public TableReader { GetContext* get_context, const SliceTransform* prefix_extractor, bool skip_filters = false) override; - void MultiGet(const ReadOptions& readOptions, - const MultiGetContext::Range* mget_range, - const SliceTransform* prefix_extractor, - bool skip_filters = false) override; + DECLARE_SYNC_AND_ASYNC_OVERRIDE(void, MultiGet, + const ReadOptions& readOptions, + const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor, + bool skip_filters = false); // Pre-fetch the disk blocks that correspond to the key range specified by // (kbegin, kend). The call will return error status in the event of @@ -366,13 +368,14 @@ class BlockBasedTable : public TableReader { bool for_compaction, bool use_cache, bool wait_for_cache) const; - void RetrieveMultipleBlocks( - const ReadOptions& options, const MultiGetRange* batch, + DECLARE_SYNC_AND_ASYNC_CONST( + void, RetrieveMultipleBlocks, const ReadOptions& options, + const MultiGetRange* batch, const autovector* handles, autovector* statuses, autovector, MultiGetContext::MAX_BATCH_SIZE>* results, - char* scratch, const UncompressionDict& uncompression_dict) const; + char* scratch, const UncompressionDict& uncompression_dict); // Get the iterator from the index reader. // diff --git a/table/block_based/block_based_table_reader_sync_and_async.h b/table/block_based/block_based_table_reader_sync_and_async.h new file mode 100644 index 0000000000..eaf07f215f --- /dev/null +++ b/table/block_based/block_based_table_reader_sync_and_async.h @@ -0,0 +1,748 @@ +// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/async_file_reader.h" +#include "util/coro_utils.h" + +#if defined(WITHOUT_COROUTINES) || \ + (defined(USE_COROUTINES) && defined(WITH_COROUTINES)) + +namespace ROCKSDB_NAMESPACE { + +// This function reads multiple data blocks from disk using Env::MultiRead() +// and optionally inserts them into the block cache. It uses the scratch +// buffer provided by the caller, which is contiguous. If scratch is a nullptr +// it allocates a separate buffer for each block. Typically, if the blocks +// need to be uncompressed and there is no compressed block cache, callers +// can allocate a temporary scratch buffer in order to minimize memory +// allocations. +// If options.fill_cache is true, it inserts the blocks into cache. If its +// false and scratch is non-null and the blocks are uncompressed, it copies +// the buffers to heap. In any case, the CachableEntry returned will +// own the data bytes. +// If compression is enabled and also there is no compressed block cache, +// the adjacent blocks are read out in one IO (combined read) +// batch - A MultiGetRange with only those keys with unique data blocks not +// found in cache +// handles - A vector of block handles. Some of them me be NULL handles +// scratch - An optional contiguous buffer to read compressed blocks into +DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::RetrieveMultipleBlocks) +(const ReadOptions& options, const MultiGetRange* batch, + const autovector* handles, + autovector* statuses, + autovector, MultiGetContext::MAX_BATCH_SIZE>* results, + char* scratch, const UncompressionDict& uncompression_dict) const { + RandomAccessFileReader* file = rep_->file.get(); + const Footer& footer = rep_->footer; + const ImmutableOptions& ioptions = rep_->ioptions; + size_t read_amp_bytes_per_bit = rep_->table_options.read_amp_bytes_per_bit; + MemoryAllocator* memory_allocator = GetMemoryAllocator(rep_->table_options); + + if (ioptions.allow_mmap_reads) { + size_t idx_in_batch = 0; + for (auto mget_iter = batch->begin(); mget_iter != batch->end(); + ++mget_iter, ++idx_in_batch) { + BlockCacheLookupContext lookup_data_block_context( + TableReaderCaller::kUserMultiGet); + const BlockHandle& handle = (*handles)[idx_in_batch]; + if (handle.IsNull()) { + continue; + } + + (*statuses)[idx_in_batch] = + RetrieveBlock(nullptr, options, handle, uncompression_dict, + &(*results)[idx_in_batch], BlockType::kData, + mget_iter->get_context, &lookup_data_block_context, + /* for_compaction */ false, /* use_cache */ true, + /* wait_for_cache */ true); + } + CO_RETURN; + } + + // In direct IO mode, blocks share the direct io buffer. + // Otherwise, blocks share the scratch buffer. + const bool use_shared_buffer = file->use_direct_io() || scratch != nullptr; + + autovector read_reqs; + size_t buf_offset = 0; + size_t idx_in_batch = 0; + + uint64_t prev_offset = 0; + size_t prev_len = 0; + autovector req_idx_for_block; + autovector req_offset_for_block; + for (auto mget_iter = batch->begin(); mget_iter != batch->end(); + ++mget_iter, ++idx_in_batch) { + const BlockHandle& handle = (*handles)[idx_in_batch]; + if (handle.IsNull()) { + continue; + } + + size_t prev_end = static_cast(prev_offset) + prev_len; + + // If current block is adjacent to the previous one, at the same time, + // compression is enabled and there is no compressed cache, we combine + // the two block read as one. + // We don't combine block reads here in direct IO mode, because when doing + // direct IO read, the block requests will be realigned and merged when + // necessary. + if (use_shared_buffer && !file->use_direct_io() && + prev_end == handle.offset()) { + req_offset_for_block.emplace_back(prev_len); + prev_len += BlockSizeWithTrailer(handle); + } else { + // No compression or current block and previous one is not adjacent: + // Step 1, create a new request for previous blocks + if (prev_len != 0) { + FSReadRequest req; + req.offset = prev_offset; + req.len = prev_len; + if (file->use_direct_io()) { + req.scratch = nullptr; + } else if (use_shared_buffer) { + req.scratch = scratch + buf_offset; + buf_offset += req.len; + } else { + req.scratch = new char[req.len]; + } + read_reqs.emplace_back(req); + } + + // Step 2, remeber the previous block info + prev_offset = handle.offset(); + prev_len = BlockSizeWithTrailer(handle); + req_offset_for_block.emplace_back(0); + } + req_idx_for_block.emplace_back(read_reqs.size()); + + PERF_COUNTER_ADD(block_read_count, 1); + PERF_COUNTER_ADD(block_read_byte, BlockSizeWithTrailer(handle)); + } + // Handle the last block and process the pending last request + if (prev_len != 0) { + FSReadRequest req; + req.offset = prev_offset; + req.len = prev_len; + if (file->use_direct_io()) { + req.scratch = nullptr; + } else if (use_shared_buffer) { + req.scratch = scratch + buf_offset; + } else { + req.scratch = new char[req.len]; + } + read_reqs.emplace_back(req); + } + + AlignedBuf direct_io_buf; + { + IOOptions opts; + IOStatus s = file->PrepareIOOptions(options, opts); + if (s.ok()) { +#if defined(WITHOUT_COROUTINES) + s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf, + options.rate_limiter_priority); +#else // WITH_COROUTINES + co_await batch->context()->reader().MultiReadAsync( + file, opts, &read_reqs[0], read_reqs.size(), &direct_io_buf); +#endif // WITH_COROUTINES + } + if (!s.ok()) { + // Discard all the results in this batch if there is any time out + // or overall MultiRead error + for (FSReadRequest& req : read_reqs) { + req.status = s; + } + } + } + + idx_in_batch = 0; + size_t valid_batch_idx = 0; + for (auto mget_iter = batch->begin(); mget_iter != batch->end(); + ++mget_iter, ++idx_in_batch) { + const BlockHandle& handle = (*handles)[idx_in_batch]; + + if (handle.IsNull()) { + continue; + } + + assert(valid_batch_idx < req_idx_for_block.size()); + assert(valid_batch_idx < req_offset_for_block.size()); + assert(req_idx_for_block[valid_batch_idx] < read_reqs.size()); + size_t& req_idx = req_idx_for_block[valid_batch_idx]; + size_t& req_offset = req_offset_for_block[valid_batch_idx]; + valid_batch_idx++; + if (mget_iter->get_context) { + ++(mget_iter->get_context->get_context_stats_.num_data_read); + } + FSReadRequest& req = read_reqs[req_idx]; + Status s = req.status; + if (s.ok()) { + if ((req.result.size() != req.len) || + (req_offset + BlockSizeWithTrailer(handle) > req.result.size())) { + s = Status::Corruption("truncated block read from " + + rep_->file->file_name() + " offset " + + std::to_string(handle.offset()) + ", expected " + + std::to_string(req.len) + " bytes, got " + + std::to_string(req.result.size())); + } + } + + BlockContents raw_block_contents; + if (s.ok()) { + if (!use_shared_buffer) { + // We allocated a buffer for this block. Give ownership of it to + // BlockContents so it can free the memory + assert(req.result.data() == req.scratch); + assert(req.result.size() == BlockSizeWithTrailer(handle)); + assert(req_offset == 0); + std::unique_ptr raw_block(req.scratch); + raw_block_contents = BlockContents(std::move(raw_block), handle.size()); + } else { + // We used the scratch buffer or direct io buffer + // which are shared by the blocks. + // raw_block_contents does not have the ownership. + raw_block_contents = + BlockContents(Slice(req.result.data() + req_offset, handle.size())); + } +#ifndef NDEBUG + raw_block_contents.is_raw_block = true; +#endif + + if (options.verify_checksums) { + PERF_TIMER_GUARD(block_checksum_time); + const char* data = req.result.data(); + // Since the scratch might be shared, the offset of the data block in + // the buffer might not be 0. req.result.data() only point to the + // begin address of each read request, we need to add the offset + // in each read request. Checksum is stored in the block trailer, + // beyond the payload size. + s = VerifyBlockChecksum(footer.checksum_type(), data + req_offset, + handle.size(), rep_->file->file_name(), + handle.offset()); + TEST_SYNC_POINT_CALLBACK("RetrieveMultipleBlocks:VerifyChecksum", &s); + } + } else if (!use_shared_buffer) { + // Free the allocated scratch buffer. + delete[] req.scratch; + } + + if (s.ok()) { + // When the blocks share the same underlying buffer (scratch or direct io + // buffer), we may need to manually copy the block into heap if the raw + // block has to be inserted into a cache. That falls into th following + // cases - + // 1. Raw block is not compressed, it needs to be inserted into the + // uncompressed block cache if there is one + // 2. If the raw block is compressed, it needs to be inserted into the + // compressed block cache if there is one + // + // In all other cases, the raw block is either uncompressed into a heap + // buffer or there is no cache at all. + CompressionType compression_type = + GetBlockCompressionType(raw_block_contents); + if (use_shared_buffer && (compression_type == kNoCompression || + (compression_type != kNoCompression && + rep_->table_options.block_cache_compressed))) { + Slice raw = + Slice(req.result.data() + req_offset, BlockSizeWithTrailer(handle)); + raw_block_contents = BlockContents( + CopyBufferToHeap(GetMemoryAllocator(rep_->table_options), raw), + handle.size()); +#ifndef NDEBUG + raw_block_contents.is_raw_block = true; +#endif + } + } + + if (s.ok()) { + if (options.fill_cache) { + BlockCacheLookupContext lookup_data_block_context( + TableReaderCaller::kUserMultiGet); + CachableEntry* block_entry = &(*results)[idx_in_batch]; + // MaybeReadBlockAndLoadToCache will insert into the block caches if + // necessary. Since we're passing the raw block contents, it will + // avoid looking up the block cache + s = MaybeReadBlockAndLoadToCache( + nullptr, options, handle, uncompression_dict, /*wait=*/true, + /*for_compaction=*/false, block_entry, BlockType::kData, + mget_iter->get_context, &lookup_data_block_context, + &raw_block_contents); + + // block_entry value could be null if no block cache is present, i.e + // BlockBasedTableOptions::no_block_cache is true and no compressed + // block cache is configured. In that case, fall + // through and set up the block explicitly + if (block_entry->GetValue() != nullptr) { + s.PermitUncheckedError(); + continue; + } + } + + CompressionType compression_type = + GetBlockCompressionType(raw_block_contents); + BlockContents contents; + if (compression_type != kNoCompression) { + UncompressionContext context(compression_type); + UncompressionInfo info(context, uncompression_dict, compression_type); + s = UncompressBlockContents( + info, req.result.data() + req_offset, handle.size(), &contents, + footer.format_version(), rep_->ioptions, memory_allocator); + } else { + // There are two cases here: + // 1) caller uses the shared buffer (scratch or direct io buffer); + // 2) we use the requst buffer. + // If scratch buffer or direct io buffer is used, we ensure that + // all raw blocks are copyed to the heap as single blocks. If scratch + // buffer is not used, we also have no combined read, so the raw + // block can be used directly. + contents = std::move(raw_block_contents); + } + if (s.ok()) { + (*results)[idx_in_batch].SetOwnedValue(new Block( + std::move(contents), read_amp_bytes_per_bit, ioptions.stats)); + } + } + (*statuses)[idx_in_batch] = s; + } +} + +using MultiGetRange = MultiGetContext::Range; +DEFINE_SYNC_AND_ASYNC(void, BlockBasedTable::MultiGet) +(const ReadOptions& read_options, const MultiGetRange* mget_range, + const SliceTransform* prefix_extractor, bool skip_filters) { + if (mget_range->empty()) { + // Caller should ensure non-empty (performance bug) + assert(false); + CO_RETURN; // Nothing to do + } + + FilterBlockReader* const filter = + !skip_filters ? rep_->filter.get() : nullptr; + MultiGetRange sst_file_range(*mget_range, mget_range->begin(), + mget_range->end()); + + // First check the full filter + // If full filter not useful, Then go into each block + const bool no_io = read_options.read_tier == kBlockCacheTier; + uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId; + if (sst_file_range.begin()->get_context) { + tracing_mget_id = sst_file_range.begin()->get_context->get_tracing_get_id(); + } + BlockCacheLookupContext lookup_context{ + TableReaderCaller::kUserMultiGet, tracing_mget_id, + /*_get_from_user_specified_snapshot=*/read_options.snapshot != nullptr}; + FullFilterKeysMayMatch(filter, &sst_file_range, no_io, prefix_extractor, + &lookup_context); + + if (!sst_file_range.empty()) { + IndexBlockIter iiter_on_stack; + // if prefix_extractor found in block differs from options, disable + // BlockPrefixIndex. Only do this check when index_type is kHashSearch. + bool need_upper_bound_check = false; + if (rep_->index_type == BlockBasedTableOptions::kHashSearch) { + need_upper_bound_check = PrefixExtractorChanged(prefix_extractor); + } + auto iiter = + NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack, + sst_file_range.begin()->get_context, &lookup_context); + std::unique_ptr> iiter_unique_ptr; + if (iiter != &iiter_on_stack) { + iiter_unique_ptr.reset(iiter); + } + + uint64_t prev_offset = std::numeric_limits::max(); + autovector block_handles; + autovector, MultiGetContext::MAX_BATCH_SIZE> results; + autovector statuses; + MultiGetContext::Mask reused_mask = 0; + char stack_buf[kMultiGetReadStackBufSize]; + std::unique_ptr block_buf; + { + MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(), + sst_file_range.end()); + std::vector cache_handles; + bool wait_for_cache_results = false; + + CachableEntry uncompression_dict; + Status uncompression_dict_status; + uncompression_dict_status.PermitUncheckedError(); + bool uncompression_dict_inited = false; + size_t total_len = 0; + ReadOptions ro = read_options; + ro.read_tier = kBlockCacheTier; + + for (auto miter = data_block_range.begin(); + miter != data_block_range.end(); ++miter) { + const Slice& key = miter->ikey; + iiter->Seek(miter->ikey); + + IndexValue v; + if (iiter->Valid()) { + v = iiter->value(); + } + if (!iiter->Valid() || + (!v.first_internal_key.empty() && !skip_filters && + UserComparatorWrapper(rep_->internal_comparator.user_comparator()) + .CompareWithoutTimestamp( + ExtractUserKey(key), + ExtractUserKey(v.first_internal_key)) < 0)) { + // The requested key falls between highest key in previous block and + // lowest key in current block. + if (!iiter->status().IsNotFound()) { + *(miter->s) = iiter->status(); + } + data_block_range.SkipKey(miter); + sst_file_range.SkipKey(miter); + continue; + } + + if (!uncompression_dict_inited && rep_->uncompression_dict_reader) { + uncompression_dict_status = + rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( + nullptr /* prefetch_buffer */, no_io, + read_options.verify_checksums, + sst_file_range.begin()->get_context, &lookup_context, + &uncompression_dict); + uncompression_dict_inited = true; + } + + if (!uncompression_dict_status.ok()) { + assert(!uncompression_dict_status.IsNotFound()); + *(miter->s) = uncompression_dict_status; + data_block_range.SkipKey(miter); + sst_file_range.SkipKey(miter); + continue; + } + + statuses.emplace_back(); + results.emplace_back(); + if (v.handle.offset() == prev_offset) { + // This key can reuse the previous block (later on). + // Mark previous as "reused" + reused_mask |= MultiGetContext::Mask{1} << (block_handles.size() - 1); + // Use null handle to indicate this one reuses same block as + // previous. + block_handles.emplace_back(BlockHandle::NullBlockHandle()); + continue; + } + // Lookup the cache for the given data block referenced by an index + // iterator value (i.e BlockHandle). If it exists in the cache, + // initialize block to the contents of the data block. + prev_offset = v.handle.offset(); + BlockHandle handle = v.handle; + BlockCacheLookupContext lookup_data_block_context( + TableReaderCaller::kUserMultiGet); + const UncompressionDict& dict = uncompression_dict.GetValue() + ? *uncompression_dict.GetValue() + : UncompressionDict::GetEmptyDict(); + Status s = RetrieveBlock( + nullptr, ro, handle, dict, &(results.back()), BlockType::kData, + miter->get_context, &lookup_data_block_context, + /* for_compaction */ false, /* use_cache */ true, + /* wait_for_cache */ false); + if (s.IsIncomplete()) { + s = Status::OK(); + } + if (s.ok() && !results.back().IsEmpty()) { + // Since we have a valid handle, check the value. If its nullptr, + // it means the cache is waiting for the final result and we're + // supposed to call WaitAll() to wait for the result. + if (results.back().GetValue() != nullptr) { + // Found it in the cache. Add NULL handle to indicate there is + // nothing to read from disk. + if (results.back().GetCacheHandle()) { + results.back().UpdateCachedValue(); + } + block_handles.emplace_back(BlockHandle::NullBlockHandle()); + } else { + // We have to wait for the cache lookup to finish in the + // background, and then we may have to read the block from disk + // anyway + assert(results.back().GetCacheHandle()); + wait_for_cache_results = true; + block_handles.emplace_back(handle); + cache_handles.emplace_back(results.back().GetCacheHandle()); + } + } else { + block_handles.emplace_back(handle); + total_len += BlockSizeWithTrailer(handle); + } + } + + if (wait_for_cache_results) { + Cache* block_cache = rep_->table_options.block_cache.get(); + block_cache->WaitAll(cache_handles); + for (size_t i = 0; i < block_handles.size(); ++i) { + // If this block was a success or failure or not needed because + // the corresponding key is in the same block as a prior key, skip + if (block_handles[i] == BlockHandle::NullBlockHandle() || + results[i].IsEmpty()) { + continue; + } + results[i].UpdateCachedValue(); + void* val = results[i].GetValue(); + if (!val) { + // The async cache lookup failed - could be due to an error + // or a false positive. We need to read the data block from + // the SST file + results[i].Reset(); + total_len += BlockSizeWithTrailer(block_handles[i]); + } else { + block_handles[i] = BlockHandle::NullBlockHandle(); + } + } + } + + if (total_len) { + char* scratch = nullptr; + const UncompressionDict& dict = uncompression_dict.GetValue() + ? *uncompression_dict.GetValue() + : UncompressionDict::GetEmptyDict(); + assert(uncompression_dict_inited || !rep_->uncompression_dict_reader); + assert(uncompression_dict_status.ok()); + // If using direct IO, then scratch is not used, so keep it nullptr. + // If the blocks need to be uncompressed and we don't need the + // compressed blocks, then we can use a contiguous block of + // memory to read in all the blocks as it will be temporary + // storage + // 1. If blocks are compressed and compressed block cache is there, + // alloc heap bufs + // 2. If blocks are uncompressed, alloc heap bufs + // 3. If blocks are compressed and no compressed block cache, use + // stack buf + if (!rep_->file->use_direct_io() && + rep_->table_options.block_cache_compressed == nullptr && + rep_->blocks_maybe_compressed) { + if (total_len <= kMultiGetReadStackBufSize) { + scratch = stack_buf; + } else { + scratch = new char[total_len]; + block_buf.reset(scratch); + } + } + CO_AWAIT(RetrieveMultipleBlocks) + (read_options, &data_block_range, &block_handles, &statuses, &results, + scratch, dict); + if (sst_file_range.begin()->get_context) { + ++(sst_file_range.begin() + ->get_context->get_context_stats_.num_sst_read); + } + } + } + + DataBlockIter first_biter; + DataBlockIter next_biter; + size_t idx_in_batch = 0; + SharedCleanablePtr shared_cleanable; + for (auto miter = sst_file_range.begin(); miter != sst_file_range.end(); + ++miter) { + Status s; + GetContext* get_context = miter->get_context; + const Slice& key = miter->ikey; + bool matched = false; // if such user key matched a key in SST + bool done = false; + bool first_block = true; + do { + DataBlockIter* biter = nullptr; + bool reusing_prev_block; + bool later_reused; + uint64_t referenced_data_size = 0; + bool does_referenced_key_exist = false; + BlockCacheLookupContext lookup_data_block_context( + TableReaderCaller::kUserMultiGet, tracing_mget_id, + /*_get_from_user_specified_snapshot=*/read_options.snapshot != + nullptr); + if (first_block) { + if (!block_handles[idx_in_batch].IsNull() || + !results[idx_in_batch].IsEmpty()) { + first_biter.Invalidate(Status::OK()); + NewDataBlockIterator( + read_options, results[idx_in_batch], &first_biter, + statuses[idx_in_batch]); + reusing_prev_block = false; + } else { + // If handler is null and result is empty, then the status is never + // set, which should be the initial value: ok(). + assert(statuses[idx_in_batch].ok()); + reusing_prev_block = true; + } + biter = &first_biter; + later_reused = + (reused_mask & (MultiGetContext::Mask{1} << idx_in_batch)) != 0; + idx_in_batch++; + } else { + IndexValue v = iiter->value(); + if (!v.first_internal_key.empty() && !skip_filters && + UserComparatorWrapper(rep_->internal_comparator.user_comparator()) + .CompareWithoutTimestamp( + ExtractUserKey(key), + ExtractUserKey(v.first_internal_key)) < 0) { + // The requested key falls between highest key in previous block and + // lowest key in current block. + break; + } + + next_biter.Invalidate(Status::OK()); + NewDataBlockIterator( + read_options, iiter->value().handle, &next_biter, + BlockType::kData, get_context, &lookup_data_block_context, + Status(), nullptr); + biter = &next_biter; + reusing_prev_block = false; + later_reused = false; + } + + if (read_options.read_tier == kBlockCacheTier && + biter->status().IsIncomplete()) { + // couldn't get block from block_cache + // Update Saver.state to Found because we are only looking for + // whether we can guarantee the key is not there when "no_io" is set + get_context->MarkKeyMayExist(); + break; + } + if (!biter->status().ok()) { + s = biter->status(); + break; + } + + bool may_exist = biter->SeekForGet(key); + if (!may_exist) { + // HashSeek cannot find the key this block and the the iter is not + // the end of the block, i.e. cannot be in the following blocks + // either. In this case, the seek_key cannot be found, so we break + // from the top level for-loop. + break; + } + + // Reusing blocks complicates pinning/Cleanable, because the cache + // entry referenced by biter can only be released once all returned + // pinned values are released. This code previously did an extra + // block_cache Ref for each reuse, but that unnecessarily increases + // block cache contention. Instead we can use a variant of shared_ptr + // to release in block cache only once. + // + // Although the biter loop below might SaveValue multiple times for + // merges, just one value_pinner suffices, as MultiGet will merge + // the operands before returning to the API user. + Cleanable* value_pinner; + if (biter->IsValuePinned()) { + if (reusing_prev_block) { + // Note that we don't yet know if the MultiGet results will need + // to pin this block, so we might wrap a block for sharing and + // still end up with 1 (or 0) pinning ref. Not ideal but OK. + // + // Here we avoid adding redundant cleanups if we didn't end up + // delegating the cleanup from last time around. + if (!biter->HasCleanups()) { + assert(shared_cleanable.get()); + if (later_reused) { + shared_cleanable.RegisterCopyWith(biter); + } else { + shared_cleanable.MoveAsCleanupTo(biter); + } + } + } else if (later_reused) { + assert(biter->HasCleanups()); + // Make the existing cleanups on `biter` sharable: + shared_cleanable.Allocate(); + // Move existing `biter` cleanup(s) to `shared_cleanable` + biter->DelegateCleanupsTo(&*shared_cleanable); + // Reference `shared_cleanable` as new cleanup for `biter` + shared_cleanable.RegisterCopyWith(biter); + } + assert(biter->HasCleanups()); + value_pinner = biter; + } else { + value_pinner = nullptr; + } + + // Call the *saver function on each entry/block until it returns false + for (; biter->Valid(); biter->Next()) { + ParsedInternalKey parsed_key; + Status pik_status = ParseInternalKey( + biter->key(), &parsed_key, false /* log_err_key */); // TODO + if (!pik_status.ok()) { + s = pik_status; + } + if (!get_context->SaveValue(parsed_key, biter->value(), &matched, + value_pinner)) { + if (get_context->State() == GetContext::GetState::kFound) { + does_referenced_key_exist = true; + referenced_data_size = + biter->key().size() + biter->value().size(); + } + done = true; + break; + } + s = biter->status(); + } + // Write the block cache access. + // XXX: There appear to be 'break' statements above that bypass this + // writing of the block cache trace record + if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() && + !reusing_prev_block) { + // Avoid making copy of block_key, cf_name, and referenced_key when + // constructing the access record. + Slice referenced_key; + if (does_referenced_key_exist) { + referenced_key = biter->key(); + } else { + referenced_key = key; + } + BlockCacheTraceRecord access_record( + rep_->ioptions.clock->NowMicros(), + /*_block_key=*/"", lookup_data_block_context.block_type, + lookup_data_block_context.block_size, rep_->cf_id_for_tracing(), + /*_cf_name=*/"", rep_->level_for_tracing(), + rep_->sst_number_for_tracing(), lookup_data_block_context.caller, + lookup_data_block_context.is_cache_hit, + lookup_data_block_context.no_insert, + lookup_data_block_context.get_id, + lookup_data_block_context.get_from_user_specified_snapshot, + /*_referenced_key=*/"", referenced_data_size, + lookup_data_block_context.num_keys_in_block, + does_referenced_key_exist); + // TODO: Should handle status here? + block_cache_tracer_ + ->WriteBlockAccess(access_record, + lookup_data_block_context.block_key, + rep_->cf_name_for_tracing(), referenced_key) + .PermitUncheckedError(); + } + s = biter->status(); + if (done) { + // Avoid the extra Next which is expensive in two-level indexes + break; + } + if (first_block) { + iiter->Seek(key); + if (!iiter->Valid()) { + break; + } + } + first_block = false; + iiter->Next(); + } while (iiter->Valid()); + + if (matched && filter != nullptr && !filter->IsBlockBased()) { + RecordTick(rep_->ioptions.stats, BLOOM_FILTER_FULL_TRUE_POSITIVE); + PERF_COUNTER_BY_LEVEL_ADD(bloom_filter_full_true_positive, 1, + rep_->level); + } + if (s.ok() && !iiter->status().IsNotFound()) { + s = iiter->status(); + } + *(miter->s) = s; + } +#ifdef ROCKSDB_ASSERT_STATUS_CHECKED + // Not sure why we need to do it. Should investigate more. + for (auto& st : statuses) { + st.PermitUncheckedError(); + } +#endif // ROCKSDB_ASSERT_STATUS_CHECKED + } +} +} // namespace ROCKSDB_NAMESPACE +#endif diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc index 47172dc2c7..4c2844f53b 100644 --- a/table/block_based/block_based_table_reader_test.cc +++ b/table/block_based/block_based_table_reader_test.cc @@ -259,7 +259,8 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) { for (auto& key_ctx : key_context) { sorted_keys.emplace_back(&key_ctx); } - MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, ReadOptions()); + MultiGetContext ctx(&sorted_keys, 0, sorted_keys.size(), 0, ReadOptions(), + fs_.get(), nullptr); // Execute MultiGet. MultiGetContext::Range range = ctx.GetMultiGetRange(); diff --git a/table/multiget_context.h b/table/multiget_context.h index f2177c66db..ca29816f57 100644 --- a/table/multiget_context.h +++ b/table/multiget_context.h @@ -14,8 +14,10 @@ #include "rocksdb/env.h" #include "rocksdb/statistics.h" #include "rocksdb/types.h" +#include "util/async_file_reader.h" #include "util/autovector.h" #include "util/math.h" +#include "util/single_thread_executor.h" namespace ROCKSDB_NAMESPACE { class GetContext; @@ -104,11 +106,20 @@ class MultiGetContext { MultiGetContext(autovector* sorted_keys, size_t begin, size_t num_keys, SequenceNumber snapshot, - const ReadOptions& read_opts) + const ReadOptions& read_opts, FileSystem* fs, + Statistics* stats) : num_keys_(num_keys), value_mask_(0), value_size_(0), - lookup_key_ptr_(reinterpret_cast(lookup_key_stack_buf)) { + lookup_key_ptr_(reinterpret_cast(lookup_key_stack_buf)) +#if USE_COROUTINES + , + reader_(fs, stats), + executor_(reader_) +#endif // USE_COROUTINES + { + (void)fs; + (void)stats; assert(num_keys <= MAX_BATCH_SIZE); if (num_keys > MAX_LOOKUP_KEYS_ON_STACK) { lookup_key_heap_buf.reset(new char[sizeof(LookupKey) * num_keys]); @@ -135,6 +146,12 @@ class MultiGetContext { } } +#if USE_COROUTINES + SingleThreadExecutor& executor() { return executor_; } + + AsyncFileReader& reader() { return reader_; } +#endif // USE_COROUTINES + private: static const int MAX_LOOKUP_KEYS_ON_STACK = 16; alignas(alignof(LookupKey)) @@ -145,6 +162,10 @@ class MultiGetContext { uint64_t value_size_; std::unique_ptr lookup_key_heap_buf; LookupKey* lookup_key_ptr_; +#if USE_COROUTINES + AsyncFileReader reader_; + SingleThreadExecutor executor_; +#endif // USE_COROUTINES public: // MultiGetContext::Range - Specifies a range of keys, by start and end index, @@ -267,6 +288,20 @@ class MultiGetContext { void AddValueSize(uint64_t value_size) { ctx_->value_size_ += value_size; } + MultiGetContext* context() const { return ctx_; } + + Range Suffix(const Range& other) const { + size_t other_last = other.FindLastRemaining(); + size_t my_last = FindLastRemaining(); + + if (my_last > other_last) { + return Range(*this, Iterator(this, other_last), + Iterator(this, my_last)); + } else { + return Range(*this, begin(), begin()); + } + } + private: friend MultiGetContext; MultiGetContext* ctx_; @@ -283,6 +318,15 @@ class MultiGetContext { return (((Mask{1} << end_) - 1) & ~((Mask{1} << start_) - 1) & ~(ctx_->value_mask_ | skip_mask_)); } + + size_t FindLastRemaining() const { + Mask mask = RemainingMask(); + size_t index = (mask >>= start_) ? start_ : 0; + while (mask >>= 1) { + index++; + } + return index; + } }; // Return the initial range that encompasses all the keys in the batch diff --git a/table/table_reader.h b/table/table_reader.h index 3631705c4b..c1d98c1431 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -10,6 +10,10 @@ #pragma once #include #include "db/range_tombstone_fragmenter.h" +#if USE_COROUTINES +#include "folly/experimental/coro/Coroutine.h" +#include "folly/experimental/coro/Task.h" +#endif #include "rocksdb/slice_transform.h" #include "table/get_context.h" #include "table/internal_iterator.h" @@ -120,6 +124,15 @@ class TableReader { } } +#if USE_COROUTINES + virtual folly::coro::Task MultiGetCoroutine( + const ReadOptions& readOptions, const MultiGetContext::Range* mget_range, + const SliceTransform* prefix_extractor, bool skip_filters = false) { + MultiGet(readOptions, mget_range, prefix_extractor, skip_filters); + co_return; + } +#endif // USE_COROUTINES + // Prefetch data corresponding to a give range of keys // Typically this functionality is required for table implementations that // persists the data on a non volatile storage medium like disk/SSD diff --git a/util/async_file_reader.cc b/util/async_file_reader.cc new file mode 100644 index 0000000000..f8fad5d4d1 --- /dev/null +++ b/util/async_file_reader.cc @@ -0,0 +1,72 @@ +// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#if USE_COROUTINES +#include "util/async_file_reader.h" + +namespace ROCKSDB_NAMESPACE { +bool AsyncFileReader::MultiReadAsyncImpl(ReadAwaiter* awaiter) { + if (tail_) { + tail_->next_ = awaiter; + } + tail_ = awaiter; + if (!head_) { + head_ = awaiter; + } + num_reqs_ += awaiter->num_reqs_; + awaiter->io_handle_.resize(awaiter->num_reqs_); + awaiter->del_fn_.resize(awaiter->num_reqs_); + for (size_t i = 0; i < awaiter->num_reqs_; ++i) { + awaiter->file_ + ->ReadAsync( + awaiter->read_reqs_[i], awaiter->opts_, + [](const FSReadRequest& req, void* cb_arg) { + FSReadRequest* read_req = static_cast(cb_arg); + read_req->status = req.status; + read_req->result = req.result; + }, + &awaiter->read_reqs_[i], &awaiter->io_handle_[i], + &awaiter->del_fn_[i], Env::IOPriority::IO_TOTAL) + .PermitUncheckedError(); + } + return true; +} + +void AsyncFileReader::Wait() { + if (!head_) { + return; + } + ReadAwaiter* waiter; + std::vector io_handles; + io_handles.reserve(num_reqs_); + waiter = head_; + do { + for (size_t i = 0; i < waiter->num_reqs_; ++i) { + if (waiter->io_handle_[i]) { + io_handles.push_back(waiter->io_handle_[i]); + } + } + } while (waiter != tail_ && (waiter = waiter->next_)); + if (io_handles.size() > 0) { + StopWatch sw(SystemClock::Default().get(), stats_, POLL_WAIT_MICROS); + fs_->Poll(io_handles, io_handles.size()).PermitUncheckedError(); + } + do { + waiter = head_; + head_ = waiter->next_; + + for (size_t i = 0; i < waiter->num_reqs_; ++i) { + if (waiter->io_handle_[i] && waiter->del_fn_[i]) { + waiter->del_fn_[i](waiter->io_handle_[i]); + } + } + waiter->awaiting_coro_.resume(); + } while (waiter != tail_); + head_ = tail_ = nullptr; + RecordInHistogram(stats_, MULTIGET_IO_BATCH_SIZE, num_reqs_); + num_reqs_ = 0; +} +} // namespace ROCKSDB_NAMESPACE +#endif // USE_COROUTINES diff --git a/util/async_file_reader.h b/util/async_file_reader.h new file mode 100644 index 0000000000..82b240339b --- /dev/null +++ b/util/async_file_reader.h @@ -0,0 +1,143 @@ +// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#if USE_COROUTINES +#include "file/random_access_file_reader.h" +#include "folly/experimental/coro/ViaIfAsync.h" +#include "port/port.h" +#include "rocksdb/file_system.h" +#include "rocksdb/statistics.h" +#include "util/autovector.h" +#include "util/stop_watch.h" + +namespace ROCKSDB_NAMESPACE { +class SingleThreadExecutor; + +// AsyncFileReader implements the Awaitable concept, which allows calling +// coroutines to co_await it. When the AsyncFileReader Awaitable is +// resumed, it initiates the fie reads requested by the awaiting caller +// by calling RandomAccessFileReader's ReadAsync. It then suspends the +// awaiting coroutine. The suspended awaiter is later resumed by Wait(). +class AsyncFileReader { + class ReadAwaiter; + template + class ReadOperation; + + public: + AsyncFileReader(FileSystem* fs, Statistics* stats) : fs_(fs), stats_(stats) {} + + ~AsyncFileReader() {} + + ReadOperation MultiReadAsync(RandomAccessFileReader* file, + const IOOptions& opts, + FSReadRequest* read_reqs, + size_t num_reqs, + AlignedBuf* aligned_buf) noexcept { + return ReadOperation{*this, file, opts, + read_reqs, num_reqs, aligned_buf}; + } + + private: + friend SingleThreadExecutor; + + // Implementation of the Awaitable concept + class ReadAwaiter { + public: + explicit ReadAwaiter(AsyncFileReader& reader, RandomAccessFileReader* file, + const IOOptions& opts, FSReadRequest* read_reqs, + size_t num_reqs, AlignedBuf* /*aligned_buf*/) noexcept + : reader_(reader), + file_(file), + opts_(opts), + read_reqs_(read_reqs), + num_reqs_(num_reqs) {} + + bool await_ready() noexcept { return false; } + + // A return value of true means suspend the awaiter (calling coroutine). The + // awaiting_coro parameter is the handle of the awaiter. The handle can be + // resumed later, so we cache it here. + bool await_suspend( + std::experimental::coroutine_handle<> awaiting_coro) noexcept { + awaiting_coro_ = awaiting_coro; + // MultiReadAsyncImpl always returns true, so caller will be suspended + return reader_.MultiReadAsyncImpl(this); + } + + void await_resume() noexcept {} + + private: + friend AsyncFileReader; + + // The parameters passed to MultiReadAsync are cached here when the caller + // calls MultiReadAsync. Later, when the execution of this awaitable is + // started, these are used to do the actual IO + AsyncFileReader& reader_; + RandomAccessFileReader* file_; + const IOOptions& opts_; + FSReadRequest* read_reqs_; + size_t num_reqs_; + autovector io_handle_; + autovector del_fn_; + std::experimental::coroutine_handle<> awaiting_coro_; + // Use this to link to the next ReadAwaiter in the suspended coroutine + // list. The head and tail of the list are tracked by AsyncFileReader. + // We use this approach rather than an STL container in order to avoid + // extra memory allocations. The coroutine call already allocates a + // ReadAwaiter object. + ReadAwaiter* next_; + }; + + // An instance of ReadOperation is returned to the caller of MultiGetAsync. + // This represents an awaitable that can be started later. + template + class ReadOperation { + public: + explicit ReadOperation(AsyncFileReader& reader, + RandomAccessFileReader* file, const IOOptions& opts, + FSReadRequest* read_reqs, size_t num_reqs, + AlignedBuf* aligned_buf) noexcept + : reader_(reader), + file_(file), + opts_(opts), + read_reqs_(read_reqs), + num_reqs_(num_reqs), + aligned_buf_(aligned_buf) {} + + auto viaIfAsync(folly::Executor::KeepAlive<> executor) const { + return folly::coro::co_viaIfAsync( + std::move(executor), + Awaiter{reader_, file_, opts_, read_reqs_, num_reqs_, aligned_buf_}); + } + + private: + AsyncFileReader& reader_; + RandomAccessFileReader* file_; + const IOOptions& opts_; + FSReadRequest* read_reqs_; + size_t num_reqs_; + AlignedBuf* aligned_buf_; + }; + + // This function does the actual work when this awaitable starts execution + bool MultiReadAsyncImpl(ReadAwaiter* awaiter); + + // Called by the SingleThreadExecutor to poll for async IO completion. + // This also resumes the awaiting coroutines. + void Wait(); + + // Head of the queue of awaiters waiting for async IO completion + ReadAwaiter* head_ = nullptr; + // Tail of the awaiter queue + ReadAwaiter* tail_ = nullptr; + // Total number of pending async IOs + size_t num_reqs_ = 0; + FileSystem* fs_; + Statistics* stats_; +}; +} // namespace ROCKSDB_NAMESPACE +#endif // USE_COROUTINES diff --git a/util/coro_utils.h b/util/coro_utils.h new file mode 100644 index 0000000000..38d278000b --- /dev/null +++ b/util/coro_utils.h @@ -0,0 +1,111 @@ +// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#if defined(USE_COROUTINES) +#include "folly/experimental/coro/Coroutine.h" +#include "folly/experimental/coro/Task.h" +#endif +#include "rocksdb/rocksdb_namespace.h" + +// This file has two sctions. The first section applies to all instances of +// header file inclusion and has an include guard. The second section is +// meant for multiple inclusions in the same source file, and is idempotent. +namespace ROCKSDB_NAMESPACE { + +#ifndef UTIL_CORO_UTILS_H_ +#define UTIL_CORO_UTILS_H_ + +#if defined(USE_COROUTINES) + +// The follwoing macros expand to regular and coroutine function +// declarations for a given function +#define DECLARE_SYNC_AND_ASYNC(__ret_type__, __func_name__, ...) \ + __ret_type__ __func_name__(__VA_ARGS__); \ + folly::coro::Task<__ret_type__> __func_name__##Coroutine(__VA_ARGS__); + +#define DECLARE_SYNC_AND_ASYNC_OVERRIDE(__ret_type__, __func_name__, ...) \ + __ret_type__ __func_name__(__VA_ARGS__) override; \ + folly::coro::Task<__ret_type__> __func_name__##Coroutine(__VA_ARGS__) \ + override; + +#define DECLARE_SYNC_AND_ASYNC_CONST(__ret_type__, __func_name__, ...) \ + __ret_type__ __func_name__(__VA_ARGS__) const; \ + folly::coro::Task<__ret_type__> __func_name__##Coroutine(__VA_ARGS__) const; + +constexpr bool using_coroutines() { return true; } +#else // !USE_COROUTINES + +// The follwoing macros expand to a regular function declaration for a given +// function +#define DECLARE_SYNC_AND_ASYNC(__ret_type__, __func_name__, ...) \ + __ret_type__ __func_name__(__VA_ARGS__); + +#define DECLARE_SYNC_AND_ASYNC_OVERRIDE(__ret_type__, __func_name__, ...) \ + __ret_type__ __func_name__(__VA_ARGS__) override; + +#define DECLARE_SYNC_AND_ASYNC_CONST(__ret_type__, __func_name__, ...) \ + __ret_type__ __func_name__(__VA_ARGS__) const; + +constexpr bool using_coroutines() { return false; } +#endif // USE_COROUTINES +#endif // UTIL_CORO_UTILS_H_ + +// The following section of the file is meant to be included twice in a +// source file - once defining WITH_COROUTINES and once defining +// WITHOUT_COROUTINES +#undef DEFINE_SYNC_AND_ASYNC +#undef CO_AWAIT +#undef CO_RETURN + +#if defined(WITH_COROUTINES) && defined(USE_COROUTINES) + +// This macro should be used in the beginning of the function +// definition. The declaration should have been done using one of the +// DECLARE_SYNC_AND_ASYNC* macros. It expands to the return type and +// the function name with the Coroutine suffix. For example - +// DEFINE_SYNC_AND_ASYNC(int, foo)(bool bar) {} +// would expand to - +// folly::coro::Task fooCoroutine(bool bar) {} +#define DEFINE_SYNC_AND_ASYNC(__ret_type__, __func_name__) \ + folly::coro::Task<__ret_type__> __func_name__##Coroutine + +// This macro should be used to call a function that might be a +// coroutine. It expands to the correct function name and prefixes +// the co_await operator if necessary. For example - +// s = CO_AWAIT(foo)(true); +// if the code is compiled WITH_COROUTINES, would expand to +// s = co_await fooCoroutine(true); +// if compiled WITHOUT_COROUTINES, would expand to +// s = foo(true); +#define CO_AWAIT(__func_name__) co_await __func_name__##Coroutine + +#define CO_RETURN co_return + +#elif defined(WITHOUT_COROUTINES) + +// This macro should be used in the beginning of the function +// definition. The declaration should have been done using one of the +// DECLARE_SYNC_AND_ASYNC* macros. It expands to the return type and +// the function name without the Coroutine suffix. For example - +// DEFINE_SYNC_AND_ASYNC(int, foo)(bool bar) {} +// would expand to - +// int foo(bool bar) {} +#define DEFINE_SYNC_AND_ASYNC(__ret_type__, __func_name__) \ + __ret_type__ __func_name__ + +// This macro should be used to call a function that might be a +// coroutine. It expands to the correct function name and prefixes +// the co_await operator if necessary. For example - +// s = CO_AWAIT(foo)(true); +// if the code is compiled WITH_COROUTINES, would expand to +// s = co_await fooCoroutine(true); +// if compiled WITHOUT_COROUTINES, would expand to +// s = foo(true); +#define CO_AWAIT(__func_name__) __func_name__ + +#define CO_RETURN return + +#endif // DO_NOT_USE_COROUTINES +} // namespace ROCKSDB_NAMESPACE diff --git a/util/single_thread_executor.h b/util/single_thread_executor.h new file mode 100644 index 0000000000..b82ddda2bf --- /dev/null +++ b/util/single_thread_executor.h @@ -0,0 +1,55 @@ +// Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#if USE_COROUTINES +#include + +#include "folly/CPortability.h" +#include "folly/CppAttributes.h" +#include "folly/Executor.h" +#include "util/async_file_reader.h" + +namespace ROCKSDB_NAMESPACE { +// Implements a simple executor that runs callback functions in the same +// thread, unlike CPUThreadExecutor which may schedule the callback on +// another thread. Runs in a tight loop calling the queued callbacks, +// and polls for async IO completions when idle. The completions will +// resume suspended coroutines and they get added to the queue, which +// will get picked up by this loop. +// Any possibility of deadlock is precluded because the file system +// guarantees that async IO completion callbacks will not be scheduled +// to run in this thread or this executor. +class SingleThreadExecutor : public folly::Executor { + public: + explicit SingleThreadExecutor(AsyncFileReader& reader) + : reader_(reader), busy_(false) {} + + void add(folly::Func callback) override { + auto& q = q_; + q.push(std::move(callback)); + if (q.size() == 1 && !busy_) { + while (!q.empty()) { + q.front()(); + q.pop(); + + if (q.empty()) { + // Prevent recursion, as the Wait may queue resumed coroutines + busy_ = true; + reader_.Wait(); + busy_ = false; + } + } + } + } + + private: + std::queue q_; + AsyncFileReader& reader_; + bool busy_; +}; +} // namespace ROCKSDB_NAMESPACE +#endif // USE_COROUTINES