From 522de4f59e6314698286cf29d8a325a284d81778 Mon Sep 17 00:00:00 2001 From: Marton Trencseni Date: Thu, 17 Mar 2016 22:40:01 +0000 Subject: [PATCH] Adding pin_l0_filter_and_index_blocks_in_cache feature. Summary: When a block based table file is opened, if prefetch_index_and_filter is true, it will prefetch the index and filter blocks, putting them into the block cache. What this feature adds: when a L0 block based table file is opened, if pin_l0_filter_and_index_blocks_in_cache is true in the options (and prefetch_index_and_filter is true), then the filter and index blocks aren't released back to the block cache at the end of BlockBasedTableReader::Open(). Instead the table reader takes ownership of them, hence pinning them, ie. the LRU cache will never push them out. Meanwhile in the table reader, further accesses will not hit the block cache, thus avoiding lock contention. When the table reader is destroyed, it releases the pinned blocks (if there were any). This has to happen before the cache is destroyed, so I had to introduce a TableReader::Close(), to guarantee the order of destruction. Test Plan: Added two unit tests for this. Existing unit tests run fine (default is pin_l0_filter_and_index_blocks_in_cache=false). DISABLE_JEMALLOC=1 OPT=-g make all valgrind_check -j32 Mac: OK. Linux: with D55287 patched in it's OK. Reviewers: sdong Reviewed By: sdong Subscribers: andrewkr, leveldb, dhruba Differential Revision: https://reviews.facebook.net/D54801 --- db/builder.cc | 5 +- db/builder.h | 2 +- db/c.cc | 5 + db/column_family.h | 2 + db/db_test.cc | 86 ++++++++++++++++ db/flush_job.cc | 16 +-- db/table_cache.cc | 21 ++-- db/table_cache.h | 11 +- db/version_set.cc | 33 ++++-- examples/rocksdb_option_file_example.ini | 1 + include/rocksdb/c.h | 3 + include/rocksdb/table.h | 6 ++ java/rocksjni/table.cc | 5 +- table/block_based_table_factory.cc | 12 ++- table/block_based_table_reader.cc | 122 +++++++++++++++++------ table/block_based_table_reader.h | 9 +- table/table_builder.h | 7 +- table/table_reader.h | 2 + table/table_test.cc | 2 +- tools/benchmark.sh | 1 + tools/db_bench_tool.cc | 5 + util/options_helper.h | 4 + util/options_test.cc | 4 +- util/testutil.cc | 1 + 24 files changed, 295 insertions(+), 70 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index ae60150033..317c9b0544 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -63,7 +63,7 @@ Status BuildTable( const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority, - TableProperties* table_properties) { + TableProperties* table_properties, int level) { // Reports the IOStats for flush for every following bytes. const size_t kReportFlushIOStatsEvery = 1048576; Status s; @@ -149,7 +149,8 @@ Status BuildTable( ReadOptions(), env_options, internal_comparator, meta->fd, nullptr, (internal_stats == nullptr) ? nullptr : internal_stats->GetFileReadHist(0), - false)); + false /* for_compaction */, nullptr /* arena */, + false /* skip_filter */, level)); s = it->status(); if (s.ok() && paranoid_file_checks) { for (it->SeekToFirst(); it->Valid(); it->Next()) { diff --git a/db/builder.h b/db/builder.h index b4b72b7d70..1eba6da9c4 100644 --- a/db/builder.h +++ b/db/builder.h @@ -61,6 +61,6 @@ extern Status BuildTable( const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority = Env::IO_HIGH, - TableProperties* table_properties = nullptr); + TableProperties* table_properties = nullptr, int level = -1); } // namespace rocksdb diff --git a/db/c.cc b/db/c.cc index 85e9114919..9f49aba231 100644 --- a/db/c.cc +++ b/db/c.cc @@ -1288,6 +1288,11 @@ void rocksdb_block_based_options_set_cache_index_and_filter_blocks( options->rep.cache_index_and_filter_blocks = v; } +void rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache( + rocksdb_block_based_table_options_t* options, unsigned char v) { + options->rep.pin_l0_filter_and_index_blocks_in_cache = v; +} + void rocksdb_block_based_options_set_skip_table_builder_flush( rocksdb_block_based_table_options_t* options, unsigned char v) { options->rep.skip_table_builder_flush = v; diff --git a/db/column_family.h b/db/column_family.h index ce5f409c4c..1a4036e608 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -465,6 +465,8 @@ class ColumnFamilySet { // Don't call while iterating over ColumnFamilySet void FreeDeadColumnFamilies(); + Cache* get_table_cache() { return table_cache_; } + private: friend class ColumnFamilyData; // helper function that gets called from cfd destructor diff --git a/db/db_test.cc b/db/db_test.cc index 4b42296c98..aeb097f76f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -424,6 +424,92 @@ TEST_F(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) { TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); } +TEST_F(DBTest, IndexAndFilterBlocksOfNewTableAddedToCacheWithPinning) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + + ASSERT_OK(Put(1, "key", "val")); + // Create a new table. + ASSERT_OK(Flush(1)); + + // index/filter blocks added to block cache right after table creation. + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + // only index/filter were added + ASSERT_EQ(2, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_DATA_MISS)); + + std::string value; + // Miss and hit count should remain the same, they're all pinned. + db_->KeyMayExist(ReadOptions(), handles_[1], "key", &value); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + // Miss and hit count should remain the same, they're all pinned. + value = Get(1, "key"); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); +} + +TEST_F(DBTest, MultiLevelIndexAndFilterBlocksCachedWithPinning) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.statistics = rocksdb::CreateDBStatistics(); + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.filter_policy.reset(NewBloomFilterPolicy(20)); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + + Put(1, "a", "begin"); + Put(1, "z", "end"); + ASSERT_OK(Flush(1)); + // move this table to L1 + dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); + TryReopenWithColumnFamilies({"default", "pikachu"}, options); + // create new table at L0 + Put(1, "a2", "begin2"); + Put(1, "z2", "end2"); + ASSERT_OK(Flush(1)); + + // get base cache values + uint64_t fm = TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS); + uint64_t fh = TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT); + uint64_t im = TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS); + uint64_t ih = TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT); + + std::string value; + // this should be read from L0 + // so cache values don't change + value = Get(1, "a2"); + ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(fh, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); + ASSERT_EQ(im, TestGetTickerCount(options, BLOCK_CACHE_INDEX_MISS)); + ASSERT_EQ(ih, TestGetTickerCount(options, BLOCK_CACHE_INDEX_HIT)); + + // should be read from L1; the block cache survives the reopen, and during + // the BlockBasedTableReader::Open() of the table we try to fetch it, we + // will see one hit from there, and then the Get() results in another hit + value = Get(1, "a"); + ASSERT_EQ(fm, TestGetTickerCount(options, BLOCK_CACHE_FILTER_MISS)); + ASSERT_EQ(fh + 2, TestGetTickerCount(options, BLOCK_CACHE_FILTER_HIT)); +} + TEST_F(DBTest, ParanoidFileChecks) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/db/flush_job.cc b/db/flush_job.cc index b83f9dbe69..b4e5b307f8 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -234,14 +234,14 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); - s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_, - cfd_->table_cache(), iter.get(), meta, - cfd_->internal_comparator(), - cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), - existing_snapshots_, earliest_write_conflict_snapshot_, - output_compression_, cfd_->ioptions()->compression_opts, - mutable_cf_options_.paranoid_file_checks, - cfd_->internal_stats(), Env::IO_HIGH, &table_properties_); + s = BuildTable( + dbname_, db_options_.env, *cfd_->ioptions(), env_options_, + cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), + cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), + existing_snapshots_, earliest_write_conflict_snapshot_, + output_compression_, cfd_->ioptions()->compression_opts, + mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), + Env::IO_HIGH, &table_properties_, 0 /* level */); info.table_properties = table_properties_; LogFlush(db_options_.info_log); } diff --git a/db/table_cache.cc b/db/table_cache.cc index 2a4621b7e1..be6b5c3247 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -88,7 +88,7 @@ Status TableCache::GetTableReader( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, - unique_ptr* table_reader, bool skip_filters) { + unique_ptr* table_reader, bool skip_filters, int level) { std::string fname = TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; @@ -109,7 +109,7 @@ Status TableCache::GetTableReader( file_read_hist)); s = ioptions_.table_factory->NewTableReader( TableReaderOptions(ioptions_, env_options, internal_comparator, - skip_filters), + skip_filters, level), std::move(file_reader), fd.GetFileSize(), table_reader); TEST_SYNC_POINT("TableCache::GetTableReader:0"); } @@ -120,7 +120,8 @@ Status TableCache::FindTable(const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, Cache::Handle** handle, const bool no_io, bool record_read_stats, - HistogramImpl* file_read_hist, bool skip_filters) { + HistogramImpl* file_read_hist, bool skip_filters, + int level) { PERF_TIMER_GUARD(find_table_nanos); Status s; uint64_t number = fd.GetNumber(); @@ -136,7 +137,7 @@ Status TableCache::FindTable(const EnvOptions& env_options, unique_ptr table_reader; s = GetTableReader(env_options, internal_comparator, fd, false /* sequential mode */, record_read_stats, - file_read_hist, &table_reader, skip_filters); + file_read_hist, &table_reader, skip_filters, level); if (!s.ok()) { assert(table_reader == nullptr); RecordTick(ioptions_.statistics, NO_FILE_ERRORS); @@ -158,7 +159,7 @@ InternalIterator* TableCache::NewIterator( const ReadOptions& options, const EnvOptions& env_options, const InternalKeyComparator& icomparator, const FileDescriptor& fd, TableReader** table_reader_ptr, HistogramImpl* file_read_hist, - bool for_compaction, Arena* arena, bool skip_filters) { + bool for_compaction, Arena* arena, bool skip_filters, int level) { PERF_TIMER_GUARD(new_table_iterator_nanos); if (table_reader_ptr != nullptr) { @@ -173,7 +174,8 @@ InternalIterator* TableCache::NewIterator( unique_ptr table_reader_unique_ptr; Status s = GetTableReader( env_options, icomparator, fd, /* sequential mode */ true, - /* record stats */ false, nullptr, &table_reader_unique_ptr); + /* record stats */ false, nullptr, &table_reader_unique_ptr, + false /* skip_filters */, level); if (!s.ok()) { return NewErrorInternalIterator(s, arena); } @@ -184,7 +186,7 @@ InternalIterator* TableCache::NewIterator( Status s = FindTable(env_options, icomparator, fd, &handle, options.read_tier == kBlockCacheTier /* no_io */, !for_compaction /* record read_stats */, - file_read_hist, skip_filters); + file_read_hist, skip_filters, level); if (!s.ok()) { return NewErrorInternalIterator(s, arena); } @@ -216,7 +218,7 @@ Status TableCache::Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, const Slice& k, GetContext* get_context, HistogramImpl* file_read_hist, - bool skip_filters) { + bool skip_filters, int level) { TableReader* t = fd.table_reader; Status s; Cache::Handle* handle = nullptr; @@ -265,7 +267,8 @@ Status TableCache::Get(const ReadOptions& options, if (!t) { s = FindTable(env_options_, internal_comparator, fd, &handle, options.read_tier == kBlockCacheTier /* no_io */, - true /* record_read_stats */, file_read_hist, skip_filters); + true /* record_read_stats */, file_read_hist, skip_filters, + level); if (s.ok()) { t = GetTableReaderFromHandle(handle); } diff --git a/db/table_cache.h b/db/table_cache.h index f8416e0b4f..499b9dbe58 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -45,34 +45,37 @@ class TableCache { // the cache and should not be deleted, and is valid for as long as the // returned iterator is live. // @param skip_filters Disables loading/accessing the filter block + // @param level The level this table is at, -1 for "not set / don't know" InternalIterator* NewIterator( const ReadOptions& options, const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, TableReader** table_reader_ptr = nullptr, HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, - Arena* arena = nullptr, bool skip_filters = false); + Arena* arena = nullptr, bool skip_filters = false, int level = -1); // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until // it returns false. // @param skip_filters Disables loading/accessing the filter block + // @param level The level this table is at, -1 for "not set / don't know" Status Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, const Slice& k, GetContext* get_context, HistogramImpl* file_read_hist = nullptr, - bool skip_filters = false); + bool skip_filters = false, int level = -1); // Evict any entry for the specified file number static void Evict(Cache* cache, uint64_t file_number); // Find table reader // @param skip_filters Disables loading/accessing the filter block + // @param level == -1 means not specified Status FindTable(const EnvOptions& toptions, const InternalKeyComparator& internal_comparator, const FileDescriptor& file_fd, Cache::Handle**, const bool no_io = false, bool record_read_stats = true, HistogramImpl* file_read_hist = nullptr, - bool skip_filters = false); + bool skip_filters = false, int level = -1); // Get TableReader from a cache handle. TableReader* GetTableReaderFromHandle(Cache::Handle* handle); @@ -106,7 +109,7 @@ class TableCache { const FileDescriptor& fd, bool sequential_mode, bool record_read_stats, HistogramImpl* file_read_hist, unique_ptr* table_reader, - bool skip_filters = false); + bool skip_filters = false, int level = -1); const ImmutableCFOptions& ioptions_; const EnvOptions& env_options_; diff --git a/db/version_set.cc b/db/version_set.cc index 1da4475401..167586d712 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -91,6 +91,7 @@ class FilePicker { const InternalKeyComparator* internal_comparator) : num_levels_(num_levels), curr_level_(static_cast(-1)), + returned_file_level_(static_cast(-1)), hit_file_level_(static_cast(-1)), search_left_bound_(0), search_right_bound_(FileIndexer::kLevelMaxIndex), @@ -117,6 +118,8 @@ class FilePicker { } } + int GetCurrentLevel() { return returned_file_level_; } + FdWithKeyRange* GetNextFile() { while (!search_ended_) { // Loops over different levels. while (curr_index_in_curr_level_ < curr_file_level_->num_files) { @@ -189,6 +192,7 @@ class FilePicker { } prev_file_ = f; #endif + returned_file_level_ = curr_level_; if (curr_level_ > 0 && cmp_largest < 0) { // No more files to search in this level. search_ended_ = !PrepareNextLevel(); @@ -215,6 +219,7 @@ class FilePicker { private: unsigned int num_levels_; unsigned int curr_level_; + unsigned int returned_file_level_; unsigned int hit_file_level_; int32_t search_left_bound_; int32_t search_right_bound_; @@ -485,7 +490,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { const EnvOptions& env_options, const InternalKeyComparator& icomparator, HistogramImpl* file_read_hist, bool for_compaction, - bool prefix_enabled, bool skip_filters) + bool prefix_enabled, bool skip_filters, int level) : TwoLevelIteratorState(prefix_enabled), table_cache_(table_cache), read_options_(read_options), @@ -493,7 +498,8 @@ class LevelFileIteratorState : public TwoLevelIteratorState { icomparator_(icomparator), file_read_hist_(file_read_hist), for_compaction_(for_compaction), - skip_filters_(skip_filters) {} + skip_filters_(skip_filters), + level_(level) {} InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override { if (meta_handle.size() != sizeof(FileDescriptor)) { @@ -505,7 +511,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { return table_cache_->NewIterator( read_options_, env_options_, icomparator_, *fd, nullptr /* don't need reference to table*/, file_read_hist_, - for_compaction_, nullptr /* arena */, skip_filters_); + for_compaction_, nullptr /* arena */, skip_filters_, level_); } } @@ -521,6 +527,7 @@ class LevelFileIteratorState : public TwoLevelIteratorState { HistogramImpl* file_read_hist_; bool for_compaction_; bool skip_filters_; + int level_; }; // A wrapper of version builder which references the current version in @@ -788,7 +795,8 @@ void Version::AddIterators(const ReadOptions& read_options, const auto& file = storage_info_.LevelFilesBrief(0).files[i]; merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, - cfd_->internal_stats()->GetFileReadHist(0), false, arena)); + cfd_->internal_stats()->GetFileReadHist(0), false, arena, + false /* skip_filters */, 0 /* level */)); } // For levels > 0, we can use a concatenating iterator that sequentially @@ -803,7 +811,7 @@ void Version::AddIterators(const ReadOptions& read_options, cfd_->internal_stats()->GetFileReadHist(level), false /* for_compaction */, cfd_->ioptions()->prefix_extractor != nullptr, - IsFilterSkipped(level)); + IsFilterSkipped(level), level); mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); auto* first_level_iter = new (mem) LevelFileNumIterator( cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level)); @@ -908,7 +916,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, read_options, *internal_comparator(), f->fd, ikey, &get_context, cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), IsFilterSkipped(static_cast(fp.GetHitFileLevel()), - fp.IsHitFileLastInLevel())); + fp.IsHitFileLastInLevel()), + fp.GetCurrentLevel()); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -2054,9 +2063,16 @@ VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, env_options_(storage_options), env_options_compactions_(env_options_) {} +void CloseTables(void* ptr, size_t) { + TableReader* table_reader = reinterpret_cast(ptr); + table_reader->Close(); +} + VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on // VersionSet + column_family_set_->get_table_cache()->ApplyToAllCacheEntries(&CloseTables, + false); column_family_set_.reset(); for (auto file : obsolete_files_) { delete file; @@ -3267,7 +3283,8 @@ InternalIterator* VersionSet::MakeInputIterator(Compaction* c) { read_options, env_options_compactions_, cfd->internal_comparator(), flevel->files[i].fd, nullptr, nullptr, /* no per level latency histogram*/ - true /* for compaction */); + true /* for_compaction */, nullptr /* arena */, + false /* skip_filters */, (int)which /* level */); } } else { // Create concatenating iterator for the files from this level @@ -3277,7 +3294,7 @@ InternalIterator* VersionSet::MakeInputIterator(Compaction* c) { cfd->internal_comparator(), nullptr /* no per level latency histogram */, true /* for_compaction */, false /* prefix enabled */, - false /* skip_filters */), + false /* skip_filters */, (int)which /* level */), new LevelFileNumIterator(cfd->internal_comparator(), c->input_levels(which))); } diff --git a/examples/rocksdb_option_file_example.ini b/examples/rocksdb_option_file_example.ini index 838afe8ebe..7dc0704298 100644 --- a/examples/rocksdb_option_file_example.ini +++ b/examples/rocksdb_option_file_example.ini @@ -138,6 +138,7 @@ block_size=8192 block_restart_interval=16 cache_index_and_filter_blocks=false + pin_l0_filter_and_index_blocks_in_cache=false index_type=kBinarySearch hash_index_allow_collision=true flush_block_policy_factory=FlushBlockBySizePolicyFactory diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 4bb870e202..6e52d20afe 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -451,6 +451,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_cache_index_and_filter_blocks( rocksdb_block_based_table_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void +rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache( + rocksdb_block_based_table_options_t*, unsigned char); +extern ROCKSDB_LIBRARY_API void rocksdb_block_based_options_set_skip_table_builder_flush( rocksdb_block_based_table_options_t* options, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_block_based_table_factory( diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index cb4d850e8f..8aba3a153a 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -64,6 +64,12 @@ struct BlockBasedTableOptions { // block during table initialization. bool cache_index_and_filter_blocks = false; + // if cache_index_and_filter_blocks is true and the below is true, then + // filter and index blocks are stored in the cache, but a reference is + // held in the "table reader" object so the blocks are pinned and only + // evicted from cache when the table reader is freed. + bool pin_l0_filter_and_index_blocks_in_cache = false; + // The index type that will be used for this table. enum IndexType : char { // A space efficient index block that is optimized for diff --git a/java/rocksjni/table.cc b/java/rocksjni/table.cc index 97aef98883..204d1ba38f 100644 --- a/java/rocksjni/table.cc +++ b/java/rocksjni/table.cc @@ -38,13 +38,14 @@ jlong Java_org_rocksdb_PlainTableConfig_newTableFactoryHandle( /* * Class: org_rocksdb_BlockBasedTableConfig * Method: newTableFactoryHandle - * Signature: (ZJIJIIZIZZJIBBI)J + * Signature: (ZJIJIIZIZZZJIBBI)J */ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( JNIEnv* env, jobject jobj, jboolean no_block_cache, jlong block_cache_size, jint block_cache_num_shardbits, jlong block_size, jint block_size_deviation, jint block_restart_interval, jboolean whole_key_filtering, jlong jfilterPolicy, jboolean cache_index_and_filter_blocks, + jboolean pin_l0_filter_and_index_blocks_in_cache, jboolean hash_index_allow_collision, jlong block_cache_compressed_size, jint block_cache_compressd_num_shard_bits, jbyte jchecksum_type, jbyte jindex_type, jint jformat_version) { @@ -70,6 +71,8 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( options.filter_policy = *pFilterPolicy; } options.cache_index_and_filter_blocks = cache_index_and_filter_blocks; + options.pin_l0_filter_and_index_blocks_in_cache = + pin_l0_filter_and_index_blocks_in_cache; options.hash_index_allow_collision = hash_index_allow_collision; if (block_cache_compressed_size > 0) { if (block_cache_compressd_num_shard_bits > 0) { diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 75917232df..c2617b168f 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -64,7 +64,7 @@ Status BlockBasedTableFactory::NewTableReader( table_reader_options.ioptions, table_reader_options.env_options, table_options_, table_reader_options.internal_comparator, std::move(file), file_size, table_reader, prefetch_enabled, - table_reader_options.skip_filters); + table_reader_options.skip_filters, table_reader_options.level); } TableBuilder* BlockBasedTableFactory::NewTableBuilder( @@ -94,6 +94,12 @@ Status BlockBasedTableFactory::SanitizeOptions( return Status::InvalidArgument("Enable cache_index_and_filter_blocks, " ", but block cache is disabled"); } + if (table_options_.pin_l0_filter_and_index_blocks_in_cache && + table_options_.no_block_cache) { + return Status::InvalidArgument( + "Enable pin_l0_filter_and_index_blocks_in_cache, " + ", but block cache is disabled"); + } if (!BlockBasedTableSupportedVersion(table_options_.format_version)) { return Status::InvalidArgument( "Unsupported BlockBasedTable format_version. Please check " @@ -115,6 +121,10 @@ std::string BlockBasedTableFactory::GetPrintableTableOptions() const { snprintf(buffer, kBufferSize, " cache_index_and_filter_blocks: %d\n", table_options_.cache_index_and_filter_blocks); ret.append(buffer); + snprintf(buffer, kBufferSize, + " pin_l0_filter_and_index_blocks_in_cache: %d\n", + table_options_.pin_l0_filter_and_index_blocks_in_cache); + ret.append(buffer); snprintf(buffer, kBufferSize, " index_type: %d\n", table_options_.index_type); ret.append(buffer); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index e48eea6947..0f9cf185c0 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -340,6 +340,28 @@ class HashIndexReader : public IndexReader { BlockContents prefixes_contents_; }; +// CachableEntry represents the entries that *may* be fetched from block cache. +// field `value` is the item we want to get. +// field `cache_handle` is the cache handle to the block cache. If the value +// was not read from cache, `cache_handle` will be nullptr. +template +struct BlockBasedTable::CachableEntry { + CachableEntry(TValue* _value, Cache::Handle* _cache_handle) + : value(_value), cache_handle(_cache_handle) {} + CachableEntry() : CachableEntry(nullptr, nullptr) {} + void Release(Cache* cache) { + if (cache_handle) { + cache->Release(cache_handle); + value = nullptr; + cache_handle = nullptr; + } + } + bool IsSet() const { return cache_handle != nullptr; } + + TValue* value = nullptr; + // if the entry is from the cache, cache_handle will be populated. + Cache::Handle* cache_handle = nullptr; +}; struct BlockBasedTable::Rep { Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options, @@ -394,34 +416,21 @@ struct BlockBasedTable::Rep { // and compatible with existing code, we introduce a wrapper that allows // block to extract prefix without knowing if a key is internal or not. unique_ptr internal_prefix_transform; + + // only used in level 0 files: + // when pin_l0_filter_and_index_blocks_in_cache is true, we do use the + // LRU cache, but we always keep the filter & idndex block's handle checked + // out here (=we don't call Release()), plus the parsed out objects + // the LRU cache will never push flush them out, hence they're pinned + CachableEntry filter_entry; + CachableEntry index_entry; }; BlockBasedTable::~BlockBasedTable() { + Close(); delete rep_; } -// CachableEntry represents the entries that *may* be fetched from block cache. -// field `value` is the item we want to get. -// field `cache_handle` is the cache handle to the block cache. If the value -// was not read from cache, `cache_handle` will be nullptr. -template -struct BlockBasedTable::CachableEntry { - CachableEntry(TValue* _value, Cache::Handle* _cache_handle) - : value(_value), cache_handle(_cache_handle) {} - CachableEntry() : CachableEntry(nullptr, nullptr) {} - void Release(Cache* cache) { - if (cache_handle) { - cache->Release(cache_handle); - value = nullptr; - cache_handle = nullptr; - } - } - - TValue* value = nullptr; - // if the entry is from the cache, cache_handle will be populated. - Cache::Handle* cache_handle = nullptr; -}; - // Helper function to setup the cache key's prefix for the Table. void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) { assert(kMaxCacheKeyPrefixSize >= 10); @@ -498,7 +507,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, uint64_t file_size, unique_ptr* table_reader, const bool prefetch_index_and_filter, - const bool skip_filters) { + const bool skip_filters, const int level) { table_reader->reset(); Footer footer; @@ -594,14 +603,33 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, assert(table_options.block_cache != nullptr); // Hack: Call NewIndexIterator() to implicitly add index to the // block_cache + + // if pin_l0_filter_and_index_blocks_in_cache is true and this is + // a level0 file, then we will pass in this pointer to rep->index + // to NewIndexIterator(), which will save the index block in there + // else it's a nullptr and nothing special happens + CachableEntry* index_entry = nullptr; + if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache && + level == 0) { + index_entry = &rep->index_entry; + } unique_ptr iter( - new_table->NewIndexIterator(ReadOptions())); + new_table->NewIndexIterator(ReadOptions(), nullptr, index_entry)); s = iter->status(); if (s.ok()) { // Hack: Call GetFilter() to implicitly add filter to the block_cache auto filter_entry = new_table->GetFilter(); - filter_entry.Release(table_options.block_cache.get()); + // if pin_l0_filter_and_index_blocks_in_cache is true, and this is + // a level0 file, then save it in rep_->filter_entry; it will be + // released in the destructor only, hence it will be pinned in the + // cache until this reader is alive + if (rep->table_options.pin_l0_filter_and_index_blocks_in_cache && + level == 0) { + rep->filter_entry = filter_entry; + } else { + filter_entry.Release(table_options.block_cache.get()); + } } } else { // If we don't use block cache for index/filter blocks access, we'll @@ -886,6 +914,11 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( return {rep_->filter.get(), nullptr /* cache handle */}; } + // we have a pinned filter block + if (rep_->filter_entry.IsSet()) { + return rep_->filter_entry; + } + PERF_TIMER_GUARD(read_filter_block_nanos); Cache* block_cache = rep_->table_options.block_cache.get(); @@ -935,12 +968,19 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( } InternalIterator* BlockBasedTable::NewIndexIterator( - const ReadOptions& read_options, BlockIter* input_iter) { + const ReadOptions& read_options, BlockIter* input_iter, + CachableEntry* index_entry) { // index reader has already been pre-populated. if (rep_->index_reader) { return rep_->index_reader->NewIterator( input_iter, read_options.total_order_seek); } + // we have a pinned index block + if (rep_->index_entry.IsSet()) { + return rep_->index_entry.value->NewIterator(input_iter, + read_options.total_order_seek); + } + PERF_TIMER_GUARD(read_index_block_nanos); bool no_io = read_options.read_tier == kBlockCacheTier; @@ -996,7 +1036,15 @@ InternalIterator* BlockBasedTable::NewIndexIterator( assert(cache_handle); auto* iter = index_reader->NewIterator( input_iter, read_options.total_order_seek); - iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); + + // the caller would like to take ownership of the index block + // don't call RegisterCleanup() in this case, the caller will take care of it + if (index_entry != nullptr) { + *index_entry = {index_reader, cache_handle}; + } else { + iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); + } + return iter; } @@ -1224,7 +1272,13 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) { RecordTick(statistics, BLOOM_FILTER_PREFIX_USEFUL); } - filter_entry.Release(rep_->table_options.block_cache.get()); + // if rep_->filter_entry is not set, we should call Release(); otherwise + // don't call, in this case we have a local copy in rep_->filter_entry, + // it's pinned to the cache and will be released in the destructor + if (!rep_->filter_entry.IsSet()) { + filter_entry.Release(rep_->table_options.block_cache.get()); + } + return may_match; } @@ -1324,7 +1378,12 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, } } - filter_entry.Release(rep_->table_options.block_cache.get()); + // if rep_->filter_entry is not set, we should call Release(); otherwise + // don't call, in this case we have a local copy in rep_->filter_entry, + // it's pinned to the cache and will be released in the destructor + if (!rep_->filter_entry.IsSet()) { + filter_entry.Release(rep_->table_options.block_cache.get()); + } return s; } @@ -1612,6 +1671,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { return s; } +void BlockBasedTable::Close() { + rep_->filter_entry.Release(rep_->table_options.block_cache.get()); + rep_->index_entry.Release(rep_->table_options.block_cache.get()); +} + Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) { out_file->Append( "Index Details:\n" diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 600ca18a37..6a88d9d9a7 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -76,7 +76,7 @@ class BlockBasedTable : public TableReader { unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader, bool prefetch_index_and_filter = true, - bool skip_filters = false); + bool skip_filters = false, int level = -1); bool PrefixMayMatch(const Slice& internal_key); @@ -119,6 +119,8 @@ class BlockBasedTable : public TableReader { // convert SST file to a human readable form Status DumpTable(WritableFile* out_file) override; + void Close() override; + ~BlockBasedTable(); bool TEST_filter_block_preloaded() const; @@ -155,8 +157,9 @@ class BlockBasedTable : public TableReader { // 2. index is not present in block cache. // 3. We disallowed any io to be performed, that is, read_options == // kBlockCacheTier - InternalIterator* NewIndexIterator(const ReadOptions& read_options, - BlockIter* input_iter = nullptr); + InternalIterator* NewIndexIterator( + const ReadOptions& read_options, BlockIter* input_iter = nullptr, + CachableEntry* index_entry = nullptr); // Read block cache from block caches (if set): block_cache and // block_cache_compressed. diff --git a/table/table_builder.h b/table/table_builder.h index ed79bed0ea..274245f083 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -29,17 +29,20 @@ struct TableReaderOptions { TableReaderOptions(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options, const InternalKeyComparator& _internal_comparator, - bool _skip_filters = false) + bool _skip_filters = false, int _level = -1) : ioptions(_ioptions), env_options(_env_options), internal_comparator(_internal_comparator), - skip_filters(_skip_filters) {} + skip_filters(_skip_filters), + level(_level) {} const ImmutableCFOptions& ioptions; const EnvOptions& env_options; const InternalKeyComparator& internal_comparator; // This is only used for BlockBasedTable (reader) bool skip_filters; + // what level this table/file is on, -1 for "not set, don't know" + int level; }; struct TableBuilderOptions { diff --git a/table/table_reader.h b/table/table_reader.h index 5751ab03f9..c047bf8cb2 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -91,6 +91,8 @@ class TableReader { virtual Status DumpTable(WritableFile* out_file) { return Status::NotSupported("DumpTable() not supported"); } + + virtual void Close() {} }; } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index 3cc7d0dc7f..424ca005ea 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1715,7 +1715,7 @@ TEST_F(BlockBasedTableTest, FilterBlockInBlockCache) { ImmutableCFOptions ioptions3(options); // Generate table without filter policy c3.Finish(options, ioptions3, table_options, - GetPlainInternalComparator(options.comparator), &keys, &kvmap); + GetPlainInternalComparator(options.comparator), &keys, &kvmap); // Open table with filter policy table_options.filter_policy.reset(NewBloomFilterPolicy(1)); options.table_factory.reset(new BlockBasedTableFactory(table_options)); diff --git a/tools/benchmark.sh b/tools/benchmark.sh index e4729e7f3f..d28aeb271a 100755 --- a/tools/benchmark.sh +++ b/tools/benchmark.sh @@ -74,6 +74,7 @@ const_params=" --level_compaction_dynamic_level_bytes=true \ --bytes_per_sync=$((8 * M)) \ --cache_index_and_filter_blocks=0 \ + --pin_l0_filter_and_index_blocks_in_cache=1 \ --benchmark_write_rate_limit=$(( 1024 * 1024 * $mb_written_per_sec )) \ \ --hard_rate_limit=3 \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index e3e11e17b5..2e1a832370 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -340,6 +340,9 @@ DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed" DEFINE_bool(cache_index_and_filter_blocks, false, "Cache index/filter blocks in block cache."); +DEFINE_bool(pin_l0_filter_and_index_blocks_in_cache, false, + "Pin index/filter blocks of L0 files in block cache."); + DEFINE_int32(block_size, static_cast(rocksdb::BlockBasedTableOptions().block_size), "Number of bytes in a block."); @@ -2511,6 +2514,8 @@ class Benchmark { } block_based_options.cache_index_and_filter_blocks = FLAGS_cache_index_and_filter_blocks; + block_based_options.pin_l0_filter_and_index_blocks_in_cache = + FLAGS_pin_l0_filter_and_index_blocks_in_cache; block_based_options.block_cache = cache_; block_based_options.block_cache_compressed = compressed_cache_; block_based_options.block_size = FLAGS_block_size; diff --git a/util/options_helper.h b/util/options_helper.h index b0864442c4..5c33e36ff4 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -491,6 +491,10 @@ static std::unordered_mapUniform(2); + opt.pin_l0_filter_and_index_blocks_in_cache = rnd->Uniform(2); opt.index_type = rnd->Uniform(2) ? BlockBasedTableOptions::kBinarySearch : BlockBasedTableOptions::kHashSearch; opt.hash_index_allow_collision = rnd->Uniform(2);