diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 355f7d736a..b0880d516a 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -99,10 +99,28 @@ inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() { inline void BlockFetcher::PrepareBufferForBlockFromFile() { // cache miss read from device - if (do_uncompress_ && + if ((do_uncompress_ || ioptions_.allow_mmap_reads) && block_size_with_trailer_ < kDefaultStackBufferSize) { // If we've got a small enough hunk of data, read it in to the // trivially allocated stack buffer instead of needing a full malloc() + // + // `GetBlockContents()` cannot return this data as its lifetime is tied to + // this `BlockFetcher`'s lifetime. That is fine because this is only used + // in cases where we do not expect the `GetBlockContents()` result to be the + // same buffer we are assigning here. If we guess incorrectly, there will be + // a heap allocation and memcpy in `GetBlockContents()` to obtain the final + // result. Considering we are eliding a heap allocation here by using the + // stack buffer, the cost of guessing incorrectly here is one extra memcpy. + // + // When `do_uncompress_` is true, we expect the uncompression step will + // allocate heap memory for the final result. However this expectation will + // be wrong if the block turns out to already be uncompressed, which we + // won't know for sure until after reading it. + // + // When `ioptions_.allow_mmap_reads` is true, we do not expect the file + // reader to use the scratch buffer at all, but instead return a pointer + // into the mapped memory. This expectation will be wrong when using a + // file reader that does not implement mmap reads properly. used_buf_ = &stack_buf_[0]; } else if (maybe_compressed_ && !do_uncompress_) { compressed_buf_ = AllocateBlock(block_size_with_trailer_, @@ -226,11 +244,11 @@ Status BlockFetcher::ReadBlockContents() { &slice_, used_buf_, nullptr, for_compaction_); PERF_COUNTER_ADD(block_read_count, 1); #ifndef NDEBUG - if (used_buf_ == &stack_buf_[0]) { + if (slice_.data() == &stack_buf_[0]) { num_stack_buf_memcpy_++; - } else if (used_buf_ == heap_buf_.get()) { + } else if (slice_.data() == heap_buf_.get()) { num_heap_buf_memcpy_++; - } else if (used_buf_ == compressed_buf_.get()) { + } else if (slice_.data() == compressed_buf_.get()) { num_compressed_buf_memcpy_++; } #endif diff --git a/table/block_fetcher_test.cc b/table/block_fetcher_test.cc index 04bf8186c5..6d1cd5ddd8 100644 --- a/table/block_fetcher_test.cc +++ b/table/block_fetcher_test.cc @@ -42,14 +42,14 @@ class CountedMemoryAllocator : public MemoryAllocator { }; struct MemcpyStats { - int num_stack_buf_memcpy = 0; - int num_heap_buf_memcpy = 0; - int num_compressed_buf_memcpy = 0; + int num_stack_buf_memcpy; + int num_heap_buf_memcpy; + int num_compressed_buf_memcpy; }; struct BufAllocationStats { - int num_heap_buf_allocations = 0; - int num_compressed_buf_allocations = 0; + int num_heap_buf_allocations; + int num_compressed_buf_allocations; }; struct TestStats { @@ -58,6 +58,17 @@ struct TestStats { }; class BlockFetcherTest : public testing::Test { + public: + enum class Mode { + kBufferedRead = 0, + kBufferedMmap, + kDirectRead, + kNumModes, + }; + // use NumModes as array size to avoid "size of array '...' has non-integral + // type" errors. + const static int NumModes = static_cast(Mode::kNumModes); + protected: void SetUp() override { test::SetupSyncPointsToMockDirectIO(); @@ -69,9 +80,8 @@ class BlockFetcherTest : public testing::Test { void TearDown() override { EXPECT_OK(test::DestroyDir(env_, test_dir_)); } - void AssertSameBlock(const BlockContents& block1, - const BlockContents& block2) { - ASSERT_EQ(block1.data.ToString(), block2.data.ToString()); + void AssertSameBlock(const std::string& block1, const std::string& block2) { + ASSERT_EQ(block1, block2); } // Creates a table with kv pairs (i, i) where i ranges from 0 to 9, inclusive. @@ -81,10 +91,9 @@ class BlockFetcherTest : public testing::Test { NewFileWriter(table_name, &writer); // Create table builder. - Options options; - ImmutableCFOptions ioptions(options); - InternalKeyComparator comparator(options.comparator); - ColumnFamilyOptions cf_options; + ImmutableCFOptions ioptions(options_); + InternalKeyComparator comparator(options_.comparator); + ColumnFamilyOptions cf_options(options_); MutableCFOptions moptions(cf_options); std::vector> factories; std::unique_ptr table_builder(table_factory_.NewTableBuilder( @@ -103,12 +112,12 @@ class BlockFetcherTest : public testing::Test { ASSERT_OK(table_builder->Finish()); } - void FetchIndexBlock(const std::string& table_name, bool use_direct_io, + void FetchIndexBlock(const std::string& table_name, CountedMemoryAllocator* heap_buf_allocator, CountedMemoryAllocator* compressed_buf_allocator, - MemcpyStats* memcpy_stats, BlockContents* index_block) { - FileOptions fopt; - fopt.use_direct_reads = use_direct_io; + MemcpyStats* memcpy_stats, BlockContents* index_block, + std::string* result) { + FileOptions fopt(options_); std::unique_ptr file; NewFileReader(table_name, fopt, &file); @@ -123,6 +132,7 @@ class BlockFetcherTest : public testing::Test { heap_buf_allocator, compressed_buf_allocator, index_block, memcpy_stats, &compression_type); ASSERT_EQ(compression_type, CompressionType::kNoCompression); + result->assign(index_block->data.ToString()); } // Fetches the first data block in both direct IO and non-direct IO mode. @@ -134,10 +144,9 @@ class BlockFetcherTest : public testing::Test { // Expects: // Block contents are the same. // Bufferr allocation and memory copy statistics are expected. - void TestFetchDataBlock(const std::string& table_name_prefix, bool compressed, - bool do_uncompress, - const TestStats& expected_non_direct_io_stats, - const TestStats& expected_direct_io_stats) { + void TestFetchDataBlock( + const std::string& table_name_prefix, bool compressed, bool do_uncompress, + std::array expected_stats_by_mode) { for (CompressionType compression_type : GetSupportedCompressions()) { bool do_compress = compression_type != kNoCompression; if (compressed != do_compress) continue; @@ -150,60 +159,80 @@ class BlockFetcherTest : public testing::Test { CompressionType expected_compression_type_after_fetch = (compressed && !do_uncompress) ? compression_type : kNoCompression; - BlockContents blocks[2]; - MemcpyStats memcpy_stats[2]; - CountedMemoryAllocator heap_buf_allocators[2]; - CountedMemoryAllocator compressed_buf_allocators[2]; - for (bool use_direct_io : {false, true}) { - FetchFirstDataBlock( - table_name, use_direct_io, compressed, do_uncompress, - expected_compression_type_after_fetch, - &heap_buf_allocators[use_direct_io], - &compressed_buf_allocators[use_direct_io], &blocks[use_direct_io], - &memcpy_stats[use_direct_io]); + BlockContents blocks[NumModes]; + std::string block_datas[NumModes]; + MemcpyStats memcpy_stats[NumModes]; + CountedMemoryAllocator heap_buf_allocators[NumModes]; + CountedMemoryAllocator compressed_buf_allocators[NumModes]; + for (int i = 0; i < NumModes; ++i) { + SetMode(static_cast(i)); + FetchFirstDataBlock(table_name, compressed, do_uncompress, + expected_compression_type_after_fetch, + &heap_buf_allocators[i], + &compressed_buf_allocators[i], &blocks[i], + &block_datas[i], &memcpy_stats[i]); } - AssertSameBlock(blocks[0], blocks[1]); + for (int i = 0; i < NumModes - 1; ++i) { + AssertSameBlock(block_datas[i], block_datas[i + 1]); + } // Check memcpy and buffer allocation statistics. - for (bool use_direct_io : {false, true}) { - const TestStats& expected_stats = use_direct_io - ? expected_direct_io_stats - : expected_non_direct_io_stats; + for (int i = 0; i < NumModes; ++i) { + const TestStats& expected_stats = expected_stats_by_mode[i]; - ASSERT_EQ(memcpy_stats[use_direct_io].num_stack_buf_memcpy, + ASSERT_EQ(memcpy_stats[i].num_stack_buf_memcpy, expected_stats.memcpy_stats.num_stack_buf_memcpy); - ASSERT_EQ(memcpy_stats[use_direct_io].num_heap_buf_memcpy, + ASSERT_EQ(memcpy_stats[i].num_heap_buf_memcpy, expected_stats.memcpy_stats.num_heap_buf_memcpy); - ASSERT_EQ(memcpy_stats[use_direct_io].num_compressed_buf_memcpy, + ASSERT_EQ(memcpy_stats[i].num_compressed_buf_memcpy, expected_stats.memcpy_stats.num_compressed_buf_memcpy); - ASSERT_EQ(heap_buf_allocators[use_direct_io].GetNumAllocations(), + ASSERT_EQ(heap_buf_allocators[i].GetNumAllocations(), expected_stats.buf_allocation_stats.num_heap_buf_allocations); ASSERT_EQ( - compressed_buf_allocators[use_direct_io].GetNumAllocations(), + compressed_buf_allocators[i].GetNumAllocations(), expected_stats.buf_allocation_stats.num_compressed_buf_allocations); // The allocated buffers are not deallocated until // the block content is deleted. - ASSERT_EQ(heap_buf_allocators[use_direct_io].GetNumDeallocations(), 0); - ASSERT_EQ( - compressed_buf_allocators[use_direct_io].GetNumDeallocations(), 0); - blocks[use_direct_io].allocation.reset(); - ASSERT_EQ(heap_buf_allocators[use_direct_io].GetNumDeallocations(), + ASSERT_EQ(heap_buf_allocators[i].GetNumDeallocations(), 0); + ASSERT_EQ(compressed_buf_allocators[i].GetNumDeallocations(), 0); + blocks[i].allocation.reset(); + ASSERT_EQ(heap_buf_allocators[i].GetNumDeallocations(), expected_stats.buf_allocation_stats.num_heap_buf_allocations); ASSERT_EQ( - compressed_buf_allocators[use_direct_io].GetNumDeallocations(), + compressed_buf_allocators[i].GetNumDeallocations(), expected_stats.buf_allocation_stats.num_compressed_buf_allocations); } } } + void SetMode(Mode mode) { + switch (mode) { + case Mode::kBufferedRead: + options_.use_direct_reads = false; + options_.allow_mmap_reads = false; + break; + case Mode::kBufferedMmap: + options_.use_direct_reads = false; + options_.allow_mmap_reads = true; + break; + case Mode::kDirectRead: + options_.use_direct_reads = true; + options_.allow_mmap_reads = false; + break; + case Mode::kNumModes: + assert(false); + } + } + private: std::string test_dir_; Env* env_; std::shared_ptr fs_; BlockBasedTableFactory table_factory_; + Options options_; std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } @@ -274,8 +303,7 @@ class BlockFetcherTest : public testing::Test { MemoryAllocator* compressed_buf_allocator, BlockContents* contents, MemcpyStats* stats, CompressionType* compresstion_type) { - Options options; - ImmutableCFOptions ioptions(options); + ImmutableCFOptions ioptions(options_); ReadOptions roptions; PersistentCacheOptions persistent_cache_options; Footer footer; @@ -299,18 +327,16 @@ class BlockFetcherTest : public testing::Test { // NOTE: expected_compression_type is the expected compression // type of the fetched block content, if the block is uncompressed, // then the expected compression type is kNoCompression. - void FetchFirstDataBlock(const std::string& table_name, bool use_direct_io, - bool compressed, bool do_uncompress, + void FetchFirstDataBlock(const std::string& table_name, bool compressed, + bool do_uncompress, CompressionType expected_compression_type, MemoryAllocator* heap_buf_allocator, MemoryAllocator* compressed_buf_allocator, - BlockContents* block, MemcpyStats* memcpy_stats) { - Options options; - ImmutableCFOptions ioptions(options); - InternalKeyComparator comparator(options.comparator); - - FileOptions foptions; - foptions.use_direct_reads = use_direct_io; + BlockContents* block, std::string* result, + MemcpyStats* memcpy_stats) { + ImmutableCFOptions ioptions(options_); + InternalKeyComparator comparator(options_.comparator); + FileOptions foptions(options_); // Get block handle for the first data block. std::unique_ptr table; @@ -339,6 +365,7 @@ class BlockFetcherTest : public testing::Test { do_uncompress, heap_buf_allocator, compressed_buf_allocator, block, memcpy_stats, &compression_type); ASSERT_EQ(compression_type, expected_compression_type); + result->assign(block->data.ToString()); } }; @@ -356,33 +383,53 @@ TEST_F(BlockFetcherTest, FetchIndexBlock) { CountedMemoryAllocator allocator; MemcpyStats memcpy_stats; - BlockContents indexes[2]; - for (bool use_direct_io : {false, true}) { - FetchIndexBlock(table_name, use_direct_io, &allocator, &allocator, - &memcpy_stats, &indexes[use_direct_io]); + BlockContents indexes[NumModes]; + std::string index_datas[NumModes]; + for (int i = 0; i < NumModes; ++i) { + SetMode(static_cast(i)); + FetchIndexBlock(table_name, &allocator, &allocator, &memcpy_stats, + &indexes[i], &index_datas[i]); + } + for (int i = 0; i < NumModes - 1; ++i) { + AssertSameBlock(index_datas[i], index_datas[i + 1]); } - AssertSameBlock(indexes[0], indexes[1]); } } // Data blocks are not compressed, -// fetch data block under both direct IO and non-direct IO. +// fetch data block under direct IO, mmap IO,and non-direct IO. // Expects: // 1. in non-direct IO mode, allocate a heap buffer and memcpy the block // into the buffer; // 2. in direct IO mode, allocate a heap buffer and memcpy from the // direct IO buffer to the heap buffer. TEST_F(BlockFetcherTest, FetchUncompressedDataBlock) { - MemcpyStats memcpy_stats; - memcpy_stats.num_heap_buf_memcpy = 1; - - BufAllocationStats buf_allocation_stats; - buf_allocation_stats.num_heap_buf_allocations = 1; - - TestStats expected_stats{memcpy_stats, buf_allocation_stats}; - - TestFetchDataBlock("FetchUncompressedDataBlock", false, false, expected_stats, - expected_stats); + TestStats expected_non_mmap_stats = { + { + 0 /* num_stack_buf_memcpy */, + 1 /* num_heap_buf_memcpy */, + 0 /* num_compressed_buf_memcpy */, + }, + { + 1 /* num_heap_buf_allocations */, + 0 /* num_compressed_buf_allocations */, + }}; + TestStats expected_mmap_stats = {{ + 0 /* num_stack_buf_memcpy */, + 0 /* num_heap_buf_memcpy */, + 0 /* num_compressed_buf_memcpy */, + }, + { + 0 /* num_heap_buf_allocations */, + 0 /* num_compressed_buf_allocations */, + }}; + std::array expected_stats_by_mode{{ + expected_non_mmap_stats /* kBufferedRead */, + expected_mmap_stats /* kBufferedMmap */, + expected_non_mmap_stats /* kDirectRead */, + }}; + TestFetchDataBlock("FetchUncompressedDataBlock", false, false, + expected_stats_by_mode); } // Data blocks are compressed, @@ -394,16 +441,32 @@ TEST_F(BlockFetcherTest, FetchUncompressedDataBlock) { // 2. in direct IO mode, allocate a compressed buffer and memcpy from the // direct IO buffer to the compressed buffer. TEST_F(BlockFetcherTest, FetchCompressedDataBlock) { - MemcpyStats memcpy_stats; - memcpy_stats.num_compressed_buf_memcpy = 1; - - BufAllocationStats buf_allocation_stats; - buf_allocation_stats.num_compressed_buf_allocations = 1; - - TestStats expected_stats{memcpy_stats, buf_allocation_stats}; - - TestFetchDataBlock("FetchCompressedDataBlock", true, false, expected_stats, - expected_stats); + TestStats expected_non_mmap_stats = { + { + 0 /* num_stack_buf_memcpy */, + 0 /* num_heap_buf_memcpy */, + 1 /* num_compressed_buf_memcpy */, + }, + { + 0 /* num_heap_buf_allocations */, + 1 /* num_compressed_buf_allocations */, + }}; + TestStats expected_mmap_stats = {{ + 0 /* num_stack_buf_memcpy */, + 0 /* num_heap_buf_memcpy */, + 0 /* num_compressed_buf_memcpy */, + }, + { + 0 /* num_heap_buf_allocations */, + 0 /* num_compressed_buf_allocations */, + }}; + std::array expected_stats_by_mode{{ + expected_non_mmap_stats /* kBufferedRead */, + expected_mmap_stats /* kBufferedMmap */, + expected_non_mmap_stats /* kDirectRead */, + }}; + TestFetchDataBlock("FetchCompressedDataBlock", true, false, + expected_stats_by_mode); } // Data blocks are compressed, @@ -415,32 +478,42 @@ TEST_F(BlockFetcherTest, FetchCompressedDataBlock) { // 2. in direct IO mode mode, allocate a heap buffer, then directly uncompress // and memcpy from the direct IO buffer to the heap buffer. TEST_F(BlockFetcherTest, FetchAndUncompressCompressedDataBlock) { - TestStats expected_non_direct_io_stats; - { - MemcpyStats memcpy_stats; - memcpy_stats.num_stack_buf_memcpy = 1; - memcpy_stats.num_heap_buf_memcpy = 1; - - BufAllocationStats buf_allocation_stats; - buf_allocation_stats.num_heap_buf_allocations = 1; - buf_allocation_stats.num_compressed_buf_allocations = 0; - - expected_non_direct_io_stats = {memcpy_stats, buf_allocation_stats}; - } - - TestStats expected_direct_io_stats; - { - MemcpyStats memcpy_stats; - memcpy_stats.num_heap_buf_memcpy = 1; - - BufAllocationStats buf_allocation_stats; - buf_allocation_stats.num_heap_buf_allocations = 1; - - expected_direct_io_stats = {memcpy_stats, buf_allocation_stats}; - } - + TestStats expected_buffered_read_stats = { + { + 1 /* num_stack_buf_memcpy */, + 1 /* num_heap_buf_memcpy */, + 0 /* num_compressed_buf_memcpy */, + }, + { + 1 /* num_heap_buf_allocations */, + 0 /* num_compressed_buf_allocations */, + }}; + TestStats expected_mmap_stats = {{ + 0 /* num_stack_buf_memcpy */, + 1 /* num_heap_buf_memcpy */, + 0 /* num_compressed_buf_memcpy */, + }, + { + 1 /* num_heap_buf_allocations */, + 0 /* num_compressed_buf_allocations */, + }}; + TestStats expected_direct_read_stats = { + { + 0 /* num_stack_buf_memcpy */, + 1 /* num_heap_buf_memcpy */, + 0 /* num_compressed_buf_memcpy */, + }, + { + 1 /* num_heap_buf_allocations */, + 0 /* num_compressed_buf_allocations */, + }}; + std::array expected_stats_by_mode{{ + expected_buffered_read_stats, + expected_mmap_stats, + expected_direct_read_stats, + }}; TestFetchDataBlock("FetchAndUncompressCompressedDataBlock", true, true, - expected_non_direct_io_stats, expected_direct_io_stats); + expected_stats_by_mode); } #endif // ROCKSDB_LITE