diff --git a/HISTORY.md b/HISTORY.md index 64c89e8eea..d525c1170f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### Public API Changes * Change default of BlockBasedTableOptions.format_version to 2. It means default DB created by 4.6 or up cannot be opened by RocksDB version 3.9 or earlier. * Added strict_capacity_limit option to NewLRUCache. If the flag is set to true, insert to cache will fail if no enough capacity can be free. Signiture of Cache::Insert() is updated accordingly. +* Tickers [NUMBER_DB_NEXT, NUMBER_DB_PREV, NUMBER_DB_NEXT_FOUND, NUMBER_DB_PREV_FOUND, ITER_BYTES_READ] are not updated immediately. The are updated when the Iterator is deleted. ### New Features * Add CompactionPri::kMinOverlappingRatio, a compaction picking mode friendly to write amplification. * Deprecate Iterator::IsKeyPinned() and replace it with Iterator::GetProperty() with prop_name="rocksdb.iterator.is.key.pinned" diff --git a/db/db_iter.cc b/db/db_iter.cc index 07c8d9e0f0..256b65447d 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -60,6 +60,44 @@ class DBIter: public Iterator { kReverse }; + // LocalStatistics contain Statistics counters that will be aggregated per + // each iterator instance and then will be sent to the global statistics when + // the iterator is destroyed. + // + // The purpose of this approach is to avoid perf regression happening + // when multiple threads bump the atomic counters from a DBIter::Next(). + struct LocalStatistics { + explicit LocalStatistics() { ResetCounters(); } + + void ResetCounters() { + next_count_ = 0; + next_found_count_ = 0; + prev_count_ = 0; + prev_found_count_ = 0; + bytes_read_ = 0; + } + + void BumpGlobalStatistics(Statistics* global_statistics) { + RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_); + RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_); + RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_); + RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_); + RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_); + ResetCounters(); + } + + // Map to Tickers::NUMBER_DB_NEXT + uint64_t next_count_; + // Map to Tickers::NUMBER_DB_NEXT_FOUND + uint64_t next_found_count_; + // Map to Tickers::NUMBER_DB_PREV + uint64_t prev_count_; + // Map to Tickers::NUMBER_DB_PREV_FOUND + uint64_t prev_found_count_; + // Map to Tickers::ITER_BYTES_READ + uint64_t bytes_read_; + }; + DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp, InternalIterator* iter, SequenceNumber s, bool arena_mode, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, @@ -86,6 +124,7 @@ class DBIter: public Iterator { } virtual ~DBIter() { RecordTick(statistics_, NO_ITERATORS, -1); + local_stats_.BumpGlobalStatistics(statistics_); if (!arena_mode_) { delete iter_; } else { @@ -213,6 +252,7 @@ class DBIter: public Iterator { bool iter_pinned_; // List of operands for merge operator. std::deque merge_operands_; + LocalStatistics local_stats_; // No copying allowed DBIter(const DBIter&); @@ -250,6 +290,9 @@ void DBIter::Next() { PERF_COUNTER_ADD(internal_key_skipped_count, 1); } + if (statistics_ != nullptr) { + local_stats_.next_count_++; + } // Now we point to the next internal position, for both of merge and // not merge cases. if (!iter_->Valid()) { @@ -257,18 +300,15 @@ void DBIter::Next() { return; } FindNextUserEntry(true /* skipping the current user key */); - if (statistics_ != nullptr) { - RecordTick(statistics_, NUMBER_DB_NEXT); - if (valid_) { - RecordTick(statistics_, NUMBER_DB_NEXT_FOUND); - RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); - } - } if (valid_ && prefix_extractor_ && prefix_same_as_start_ && prefix_extractor_->Transform(saved_key_.GetKey()) .compare(prefix_start_.GetKey()) != 0) { valid_ = false; } + if (statistics_ != nullptr && valid_) { + local_stats_.next_found_count_++; + local_stats_.bytes_read_ += (key().size() + value().size()); + } } // PRE: saved_key_ has the current user key if skipping @@ -436,10 +476,10 @@ void DBIter::Prev() { } PrevInternal(); if (statistics_ != nullptr) { - RecordTick(statistics_, NUMBER_DB_PREV); + local_stats_.prev_count_++; if (valid_) { - RecordTick(statistics_, NUMBER_DB_PREV_FOUND); - RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); + local_stats_.prev_found_count_++; + local_stats_.bytes_read_ += (key().size() + value().size()); } } if (valid_ && prefix_extractor_ && prefix_same_as_start_ && diff --git a/db/db_test.cc b/db/db_test.cc index d68087d13c..4b42296c98 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -10611,6 +10611,88 @@ TEST_F(DBTest, PrefixExtractorBlockFilter) { delete iter; } +TEST_F(DBTest, IteratorWithLocalStatistics) { + Options options = CurrentOptions(); + options.statistics = rocksdb::CreateDBStatistics(); + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 1000; i++) { + // Key 10 bytes / Value 10 bytes + ASSERT_OK(Put(RandomString(&rnd, 10), RandomString(&rnd, 10))); + } + + std::atomic total_next(0); + std::atomic total_next_found(0); + std::atomic total_prev(0); + std::atomic total_prev_found(0); + std::atomic total_bytes(0); + + std::vector threads; + std::function reader_func_next = [&]() { + Iterator* iter = db_->NewIterator(ReadOptions()); + + iter->SeekToFirst(); + // Seek will bump ITER_BYTES_READ + total_bytes += iter->key().size(); + total_bytes += iter->value().size(); + while (true) { + iter->Next(); + total_next++; + + if (!iter->Valid()) { + break; + } + total_next_found++; + total_bytes += iter->key().size(); + total_bytes += iter->value().size(); + } + + delete iter; + }; + + std::function reader_func_prev = [&]() { + Iterator* iter = db_->NewIterator(ReadOptions()); + + iter->SeekToLast(); + // Seek will bump ITER_BYTES_READ + total_bytes += iter->key().size(); + total_bytes += iter->value().size(); + while (true) { + iter->Prev(); + total_prev++; + + if (!iter->Valid()) { + break; + } + total_prev_found++; + total_bytes += iter->key().size(); + total_bytes += iter->value().size(); + } + + delete iter; + }; + + for (int i = 0; i < 10; i++) { + threads.emplace_back(reader_func_next); + } + for (int i = 0; i < 15; i++) { + threads.emplace_back(reader_func_prev); + } + + for (auto& t : threads) { + t.join(); + } + + ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT), total_next); + ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_NEXT_FOUND), + total_next_found); + ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV), total_prev); + ASSERT_EQ(TestGetTickerCount(options, NUMBER_DB_PREV_FOUND), + total_prev_found); + ASSERT_EQ(TestGetTickerCount(options, ITER_BYTES_READ), total_bytes); +} + #ifndef ROCKSDB_LITE class BloomStatsTestWithParam : public DBTest,