diff --git a/cache/cache_entry_roles.cc b/cache/cache_entry_roles.cc index dbc7120639..4416b7c2f4 100644 --- a/cache/cache_entry_roles.cc +++ b/cache/cache_entry_roles.cc @@ -19,6 +19,7 @@ std::array kCacheEntryRoleToCamelString{{ "IndexBlock", "OtherBlock", "WriteBuffer", + "CompressionDictionaryBuildingBuffer", "Misc", }}; @@ -30,6 +31,7 @@ std::array kCacheEntryRoleToHyphenString{{ "index-block", "other-block", "write-buffer", + "compression-dictionary-building-buffer", "misc", }}; diff --git a/cache/cache_entry_roles.h b/cache/cache_entry_roles.h index 22148e00c4..9a6a2ad245 100644 --- a/cache/cache_entry_roles.h +++ b/cache/cache_entry_roles.h @@ -14,6 +14,8 @@ namespace ROCKSDB_NAMESPACE { // Classifications of block cache entries, for reporting statistics +// Adding new enum to this class requires corresponding updates to +// kCacheEntryRoleToCamelString and kCacheEntryRoleToHyphenString enum class CacheEntryRole { // Block-based table data block kDataBlock, @@ -29,6 +31,9 @@ enum class CacheEntryRole { kOtherBlock, // WriteBufferManager reservations to account for memtable usage kWriteBuffer, + // BlockBasedTableBuilder reservations to account for + // compression dictionary building buffer's memory usage + kCompressionDictionaryBuildingBuffer, // Default bucket, for miscellaneous cache entries. Do not use for // entries that could potentially add up to large usage. kMisc, diff --git a/cache/cache_reservation_manager.cc b/cache/cache_reservation_manager.cc index d6f62d647e..6a00748718 100644 --- a/cache/cache_reservation_manager.cc +++ b/cache/cache_reservation_manager.cc @@ -69,6 +69,9 @@ Status CacheReservationManager::UpdateCacheReservation( // This makes it possible to keep the template definitions in the .cc file. template Status CacheReservationManager::UpdateCacheReservation< CacheEntryRole::kWriteBuffer>(std::size_t new_mem_used); +template Status CacheReservationManager::UpdateCacheReservation< + CacheEntryRole::kCompressionDictionaryBuildingBuffer>( + std::size_t new_mem_used); // For cache reservation manager unit tests template Status CacheReservationManager::UpdateCacheReservation< CacheEntryRole::kMisc>(std::size_t new_mem_used); diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index be2cae968c..d56530965b 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -21,6 +21,8 @@ #include #include +#include "cache/cache_entry_roles.h" +#include "cache/cache_reservation_manager.h" #include "db/dbformat.h" #include "index_builder.h" #include "memory/memory_allocator.h" @@ -312,7 +314,7 @@ struct BlockBasedTableBuilder::Rep { // `kBuffered` state is allowed only as long as the buffering of uncompressed // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. uint64_t buffer_limit; - + std::unique_ptr cache_rev_mng; const bool use_delta_encoding_for_index_values; std::unique_ptr filter_builder; char cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize]; @@ -444,6 +446,12 @@ struct BlockBasedTableBuilder::Rep { buffer_limit = std::min(tbo.target_file_size, compression_opts.max_dict_buffer_bytes); } + if (table_options.no_block_cache) { + cache_rev_mng.reset(nullptr); + } else { + cache_rev_mng.reset( + new CacheReservationManager(table_options.block_cache)); + } for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { compression_ctxs[i].reset(new CompressionContext(compression_type)); } @@ -896,10 +904,24 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { assert(!r->data_block.empty()); r->first_key_in_next_block = &key; Flush(); + if (r->state == Rep::State::kBuffered) { + bool exceeds_buffer_limit = + (r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit); + bool is_cache_full = false; - if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 && - r->data_begin_offset > r->buffer_limit) { - EnterUnbuffered(); + // Increase cache reservation for the last buffered data block + // only if the block is not going to be unbuffered immediately + // and there exists a cache reservation manager + if (!exceeds_buffer_limit && r->cache_rev_mng != nullptr) { + Status s = r->cache_rev_mng->UpdateCacheReservation< + CacheEntryRole::kCompressionDictionaryBuildingBuffer>( + r->data_begin_offset); + is_cache_full = s.IsIncomplete(); + } + + if (exceeds_buffer_limit || is_cache_full) { + EnterUnbuffered(); + } } // Add item to index block. @@ -1910,10 +1932,16 @@ void BlockBasedTableBuilder::EnterUnbuffered() { r->pending_handle); } } - std::swap(iter, next_block_iter); } r->data_block_buffers.clear(); + r->data_begin_offset = 0; + if (r->cache_rev_mng != nullptr) { + Status s = r->cache_rev_mng->UpdateCacheReservation< + CacheEntryRole::kCompressionDictionaryBuildingBuffer>( + r->data_begin_offset); + s.PermitUncheckedError(); + } } Status BlockBasedTableBuilder::Finish() { diff --git a/table/table_test.cc b/table/table_test.cc index f805418af7..ad07531863 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -7,6 +7,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include #include @@ -4746,6 +4747,239 @@ TEST_P(BlockBasedTableTest, OutOfBoundOnNext) { ASSERT_FALSE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound); } +TEST_P( + BlockBasedTableTest, + IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBuilderFinish) { + constexpr std::size_t kSizeDummyEntry = 256 * 1024; + constexpr std::size_t kMetaDataChargeOverhead = 10000; + constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024; + constexpr std::size_t kMaxDictBytes = 1024; + constexpr std::size_t kMaxDictBufferBytes = 1024; + + BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); + LRUCacheOptions lo; + lo.capacity = kCacheCapacity; + lo.num_shard_bits = 0; // 2^0 shard + lo.strict_capacity_limit = true; + std::shared_ptr cache(NewLRUCache(lo)); + table_options.block_cache = cache; + table_options.flush_block_policy_factory = + std::make_shared(); + + Options options; + options.compression = kSnappyCompression; + options.compression_opts.max_dict_bytes = kMaxDictBytes; + options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + test::StringSink* sink = new test::StringSink(); + std::unique_ptr holder(sink); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(holder), "test_file_name", FileOptions())); + + ImmutableOptions ioptions(options); + MutableCFOptions moptions(options); + InternalKeyComparator ikc(options.comparator); + IntTblPropCollectorFactories int_tbl_prop_collector_factories; + + std::unique_ptr builder(options.table_factory->NewTableBuilder( + TableBuilderOptions(ioptions, moptions, ikc, + &int_tbl_prop_collector_factories, kSnappyCompression, + options.compression_opts, kUnknownColumnFamily, + "test_cf", -1 /* level */), + file_writer.get())); + + std::string key1 = "key1"; + std::string value1 = "val1"; + InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue); + // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy + // therefore won't trigger any data block's buffering + builder->Add(ik1.Encode(), value1); + ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); + + std::string key2 = "key2"; + std::string value2 = "val2"; + InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue); + // Adding the second key will trigger a flush of the last data block (the one + // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger + // buffering of that data block. + builder->Add(ik2.Encode(), value2); + // Cache reservation will increase for last buffered data block (the one + // containing key1 and value1) since the buffer limit is not exceeded after + // that buffering and the cache will not be full after this reservation + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); + EXPECT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead); + + ASSERT_OK(builder->Finish()); + EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); +} + +TEST_P( + BlockBasedTableTest, + IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBufferLimitExceed) { + constexpr std::size_t kSizeDummyEntry = 256 * 1024; + constexpr std::size_t kMetaDataChargeOverhead = 10000; + constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024; + constexpr std::size_t kMaxDictBytes = 1024; + constexpr std::size_t kMaxDictBufferBytes = 2 * kSizeDummyEntry; + + BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); + LRUCacheOptions lo; + lo.capacity = kCacheCapacity; + lo.num_shard_bits = 0; // 2^0 shard + lo.strict_capacity_limit = true; + std::shared_ptr cache(NewLRUCache(lo)); + table_options.block_cache = cache; + table_options.flush_block_policy_factory = + std::make_shared(); + + Options options; + options.compression = kSnappyCompression; + options.compression_opts.max_dict_bytes = kMaxDictBytes; + options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + test::StringSink* sink = new test::StringSink(); + std::unique_ptr holder(sink); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(holder), "test_file_name", FileOptions())); + + ImmutableOptions ioptions(options); + MutableCFOptions moptions(options); + InternalKeyComparator ikc(options.comparator); + IntTblPropCollectorFactories int_tbl_prop_collector_factories; + + std::unique_ptr builder(options.table_factory->NewTableBuilder( + TableBuilderOptions(ioptions, moptions, ikc, + &int_tbl_prop_collector_factories, kSnappyCompression, + options.compression_opts, kUnknownColumnFamily, + "test_cf", -1 /* level */), + file_writer.get())); + + std::string key1 = "key1"; + std::string value1(kSizeDummyEntry, '0'); + InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue); + // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy + // therefore won't trigger any data block's buffering + builder->Add(ik1.Encode(), value1); + ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); + + std::string key2 = "key2"; + std::string value2(kSizeDummyEntry, '0'); + InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue); + // Adding the second key will trigger a flush of the last data block (the one + // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger + // buffering of the last data block. + builder->Add(ik2.Encode(), value2); + // Cache reservation will increase for last buffered data block (the one + // containing key1 and value1) since the buffer limit is not exceeded after + // the buffering and the cache will not be full after this reservation + EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry); + EXPECT_LT(cache->GetPinnedUsage(), + 2 * kSizeDummyEntry + kMetaDataChargeOverhead); + + std::string key3 = "key3"; + std::string value3 = "val3"; + InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue); + // Adding the third key will trigger a flush of the last data block (the one + // containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger + // buffering of the last data block. + builder->Add(ik3.Encode(), value3); + // Cache reservation will decrease since the buffer limit is now exceeded + // after the last buffering and EnterUnbuffered() is triggered + EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); + + ASSERT_OK(builder->Finish()); + EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); +} + +TEST_P( + BlockBasedTableTest, + IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnCacheFull) { + constexpr std::size_t kSizeDummyEntry = 256 * 1024; + constexpr std::size_t kMetaDataChargeOverhead = 10000; + // A small kCacheCapacity is chosen so that increase cache reservation for + // buffering two data blocks, each containing key1/value1, key2/a big + // value2, will cause cache full + constexpr std::size_t kCacheCapacity = + 1 * kSizeDummyEntry + kSizeDummyEntry / 2; + constexpr std::size_t kMaxDictBytes = 1024; + // A big kMaxDictBufferBytes is chosen so that adding a big key value pair + // (key2, value2) won't exceed the buffer limit + constexpr std::size_t kMaxDictBufferBytes = 1024 * 1024 * 1024; + + BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); + LRUCacheOptions lo; + lo.capacity = kCacheCapacity; + lo.num_shard_bits = 0; // 2^0 shard + lo.strict_capacity_limit = true; + std::shared_ptr cache(NewLRUCache(lo)); + table_options.block_cache = cache; + table_options.flush_block_policy_factory = + std::make_shared(); + + Options options; + options.compression = kSnappyCompression; + options.compression_opts.max_dict_bytes = kMaxDictBytes; + options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + test::StringSink* sink = new test::StringSink(); + std::unique_ptr holder(sink); + std::unique_ptr file_writer(new WritableFileWriter( + std::move(holder), "test_file_name", FileOptions())); + + ImmutableOptions ioptions(options); + MutableCFOptions moptions(options); + InternalKeyComparator ikc(options.comparator); + IntTblPropCollectorFactories int_tbl_prop_collector_factories; + + std::unique_ptr builder(options.table_factory->NewTableBuilder( + TableBuilderOptions(ioptions, moptions, ikc, + &int_tbl_prop_collector_factories, kSnappyCompression, + options.compression_opts, kUnknownColumnFamily, + "test_cf", -1 /* level */), + file_writer.get())); + + std::string key1 = "key1"; + std::string value1 = "val1"; + InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue); + // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy + // therefore won't trigger any data block's buffering + builder->Add(ik1.Encode(), value1); + ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); + + std::string key2 = "key2"; + std::string value2(kSizeDummyEntry, '0'); + InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue); + // Adding the second key will trigger a flush of the last data block (the one + // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger + // buffering of the last data block. + builder->Add(ik2.Encode(), value2); + // Cache reservation will increase for the last buffered data block (the one + // containing key1 and value1) since the buffer limit is not exceeded after + // the buffering and the cache will not be full after this reservation + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); + EXPECT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead); + + std::string key3 = "key3"; + std::string value3 = "value3"; + InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue); + // Adding the third key will trigger a flush of the last data block (the one + // containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger + // buffering of the last data block. + builder->Add(ik3.Encode(), value3); + // Cache reservation will decrease since the cache is now full after + // increasing reservation for the last buffered block and EnterUnbuffered() is + // triggered + EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); + + ASSERT_OK(builder->Finish()); + EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {