diff --git a/db/column_family.h b/db/column_family.h index 9c415c2a84..96b08c52ea 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -166,7 +166,7 @@ class ColumnFamilyData { bool IsDropped() const { return dropped_; } // thread-safe - int NumberLevels() const { return options_.num_levels; } + int NumberLevels() const { return ioptions_.num_levels; } void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } uint64_t GetLogNumber() const { return log_number_; } diff --git a/db/db_impl.cc b/db/db_impl.cc index 6a2daad7d2..0e47774a72 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3828,16 +3828,16 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, // not supported in lite version return nullptr; #else - auto iter = new ForwardIterator(this, read_options, cfd); + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + auto iter = new ForwardIterator(this, read_options, cfd, sv); return NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter, - kMaxSequenceNumber, - cfd->options()->max_sequential_skip_in_iterations, - read_options.iterate_upper_bound); + kMaxSequenceNumber, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + read_options.iterate_upper_bound); #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); - SuperVersion* sv = nullptr; - sv = cfd->GetReferencedSuperVersion(&mutex_); + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); auto snapshot = read_options.snapshot != nullptr @@ -3889,7 +3889,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, // that they are likely to be in the same cache line and/or page. ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( env_, *cfd->ioptions(), cfd->user_comparator(), - snapshot, cfd->options()->max_sequential_skip_in_iterations, + snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, read_options.iterate_upper_bound); Iterator* internal_iter = @@ -3908,19 +3908,6 @@ Status DBImpl::NewIterators( std::vector* iterators) { iterators->clear(); iterators->reserve(column_families.size()); - SequenceNumber latest_snapshot = 0; - std::vector super_versions; - super_versions.reserve(column_families.size()); - - if (!read_options.tailing) { - mutex_.Lock(); - latest_snapshot = versions_->LastSequence(); - for (auto cfh : column_families) { - auto cfd = reinterpret_cast(cfh)->cfd(); - super_versions.push_back(cfd->GetSuperVersion()->Ref()); - } - mutex_.Unlock(); - } if (read_options.tailing) { #ifdef ROCKSDB_LITE @@ -3929,17 +3916,21 @@ Status DBImpl::NewIterators( #else for (auto cfh : column_families) { auto cfd = reinterpret_cast(cfh)->cfd(); - auto iter = new ForwardIterator(this, read_options, cfd); + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); + auto iter = new ForwardIterator(this, read_options, cfd, sv); iterators->push_back( NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter, - kMaxSequenceNumber, - cfd->options()->max_sequential_skip_in_iterations)); + kMaxSequenceNumber, + sv->mutable_cf_options.max_sequential_skip_in_iterations)); } #endif } else { + SequenceNumber latest_snapshot = versions_->LastSequence(); + for (size_t i = 0; i < column_families.size(); ++i) { - auto cfh = reinterpret_cast(column_families[i]); - auto cfd = cfh->cfd(); + auto* cfd = reinterpret_cast( + column_families[i])->cfd(); + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); auto snapshot = read_options.snapshot != nullptr @@ -3949,9 +3940,9 @@ Status DBImpl::NewIterators( ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( env_, *cfd->ioptions(), cfd->user_comparator(), snapshot, - cfd->options()->max_sequential_skip_in_iterations); + sv->mutable_cf_options.max_sequential_skip_in_iterations); Iterator* internal_iter = NewInternalIterator( - read_options, cfd, super_versions[i], db_iter->GetArena()); + read_options, cfd, sv, db_iter->GetArena()); db_iter->SetIterUnderDBIter(internal_iter); iterators->push_back(db_iter); } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 31ebdbedde..c98693d38d 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -53,7 +53,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& read_options, ? reinterpret_cast( read_options.snapshot)->number_ : latest_snapshot), - cfd->options()->max_sequential_skip_in_iterations); + super_version->mutable_cf_options.max_sequential_skip_in_iterations); auto internal_iter = NewInternalIterator( read_options, cfd, super_version, db_iter->GetArena()); db_iter->SetIterUnderDBIter(internal_iter); @@ -72,16 +72,17 @@ Status DBImplReadOnly::NewIterators( SequenceNumber latest_snapshot = versions_->LastSequence(); for (auto cfh : column_families) { - auto cfd = reinterpret_cast(cfh)->cfd(); - auto db_iter = NewArenaWrappedDbIterator( + auto* cfd = reinterpret_cast(cfh)->cfd(); + auto* sv = cfd->GetSuperVersion()->Ref(); + auto* db_iter = NewArenaWrappedDbIterator( env_, *cfd->ioptions(), cfd->user_comparator(), (read_options.snapshot != nullptr ? reinterpret_cast( read_options.snapshot)->number_ : latest_snapshot), - cfd->options()->max_sequential_skip_in_iterations); - auto internal_iter = NewInternalIterator( - read_options, cfd, cfd->GetSuperVersion()->Ref(), db_iter->GetArena()); + sv->mutable_cf_options.max_sequential_skip_in_iterations); + auto* internal_iter = NewInternalIterator( + read_options, cfd, sv, db_iter->GetArena()); db_iter->SetIterUnderDBIter(internal_iter); iterators->push_back(db_iter); } diff --git a/db/db_test.cc b/db/db_test.cc index 06d0241ee3..1ee8b58af6 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8889,6 +8889,56 @@ TEST(DBTest, DynamicCompactionOptions) { ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok()); } +TEST(DBTest, DynamicMiscOptions) { + // Test max_sequential_skip_in_iterations + Options options; + options.env = env_; + options.create_if_missing = true; + options.max_sequential_skip_in_iterations = 16; + options.compression = kNoCompression; + options.statistics = rocksdb::CreateDBStatistics(); + DestroyAndReopen(&options); + + auto assert_reseek_count = [this, &options](int key_start, int num_reseek) { + int key0 = key_start; + int key1 = key_start + 1; + int key2 = key_start + 2; + Random rnd(301); + ASSERT_OK(Put(Key(key0), RandomString(&rnd, 8))); + for (int i = 0; i < 10; ++i) { + ASSERT_OK(Put(Key(key1), RandomString(&rnd, 8))); + } + ASSERT_OK(Put(Key(key2), RandomString(&rnd, 8))); + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + iter->Seek(Key(key1)); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Key(key1)), 0); + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Key(key2)), 0); + ASSERT_EQ(num_reseek, + TestGetTickerCount(options, NUMBER_OF_RESEEKS_IN_ITERATION)); + }; + // No reseek + assert_reseek_count(100, 0); + + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_sequential_skip_in_iterations", "4"} + })); + // Clear memtable and make new option effective + dbfull()->TEST_FlushMemTable(true); + // Trigger reseek + assert_reseek_count(200, 1); + + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_sequential_skip_in_iterations", "16"} + })); + // Clear memtable and make new option effective + dbfull()->TEST_FlushMemTable(true); + // No reseek + assert_reseek_count(300, 1); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 684045e056..b2e4bd0674 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -114,25 +114,29 @@ class LevelIterator : public Iterator { }; ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, - ColumnFamilyData* cfd) + ColumnFamilyData* cfd, SuperVersion* current_sv) : db_(db), read_options_(read_options), cfd_(cfd), prefix_extractor_(cfd->options()->prefix_extractor.get()), user_comparator_(cfd->user_comparator()), immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())), - sv_(nullptr), + sv_(current_sv), mutable_iter_(nullptr), current_(nullptr), valid_(false), is_prev_set_(false), - is_prev_inclusive_(false) {} - -ForwardIterator::~ForwardIterator() { - Cleanup(); + is_prev_inclusive_(false) { + if (sv_) { + RebuildIterators(false); + } } -void ForwardIterator::Cleanup() { +ForwardIterator::~ForwardIterator() { + Cleanup(true); +} + +void ForwardIterator::Cleanup(bool release_sv) { if (mutable_iter_ != nullptr) { mutable_iter_->~Iterator(); } @@ -149,15 +153,17 @@ void ForwardIterator::Cleanup() { } level_iters_.clear(); - if (sv_ != nullptr && sv_->Unref()) { - DBImpl::DeletionState deletion_state; - db_->mutex_.Lock(); - sv_->Cleanup(); - db_->FindObsoleteFiles(deletion_state, false, true); - db_->mutex_.Unlock(); - delete sv_; - if (deletion_state.HaveSomethingToDelete()) { - db_->PurgeObsoleteFiles(deletion_state); + if (release_sv) { + if (sv_ != nullptr && sv_->Unref()) { + DBImpl::DeletionState deletion_state; + db_->mutex_.Lock(); + sv_->Cleanup(); + db_->FindObsoleteFiles(deletion_state, false, true); + db_->mutex_.Unlock(); + delete sv_; + if (deletion_state.HaveSomethingToDelete()) { + db_->PurgeObsoleteFiles(deletion_state); + } } } } @@ -169,7 +175,7 @@ bool ForwardIterator::Valid() const { void ForwardIterator::SeekToFirst() { if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { - RebuildIterators(); + RebuildIterators(true); } else if (status_.IsIncomplete()) { ResetIncompleteIterators(); } @@ -179,7 +185,7 @@ void ForwardIterator::SeekToFirst() { void ForwardIterator::Seek(const Slice& internal_key) { if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { - RebuildIterators(); + RebuildIterators(true); } else if (status_.IsIncomplete()) { ResetIncompleteIterators(); } @@ -188,6 +194,7 @@ void ForwardIterator::Seek(const Slice& internal_key) { void ForwardIterator::SeekInternal(const Slice& internal_key, bool seek_to_first) { + assert(mutable_iter_); // mutable seek_to_first ? mutable_iter_->SeekToFirst() : mutable_iter_->Seek(internal_key); @@ -338,7 +345,7 @@ void ForwardIterator::Next() { std::string current_key = key().ToString(); Slice old_key(current_key.data(), current_key.size()); - RebuildIterators(); + RebuildIterators(true); SeekInternal(old_key, false); if (!valid_ || key().compare(old_key) != 0) { return; @@ -412,11 +419,13 @@ Status ForwardIterator::status() const { return Status::OK(); } -void ForwardIterator::RebuildIterators() { +void ForwardIterator::RebuildIterators(bool refresh_sv) { // Clean up - Cleanup(); - // New - sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); + Cleanup(refresh_sv); + if (refresh_sv) { + // New + sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); + } mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_); sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_); const auto& l0_files = sv_->current->files_[0]; diff --git a/db/forward_iterator.h b/db/forward_iterator.h index 4d3761ee16..537dc13520 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -51,7 +51,7 @@ typedef std::priority_queue max_bytes_for_level_multiplier_additional; + // Misc options + uint64_t max_sequential_skip_in_iterations; + // Derived options // Per-level target file size. std::vector max_file_size; diff --git a/util/options_helper.cc b/util/options_helper.cc index 2a56a1ccf1..372a7171f4 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -150,6 +150,17 @@ bool ParseCompactionOptions(const std::string& name, const std::string& value, return true; } +template +bool ParseMiscOptions(const std::string& name, const std::string& value, + OptionsType* new_options) { + if (name == "max_sequential_skip_in_iterations") { + new_options->max_sequential_skip_in_iterations = ParseUint64(value); + } else { + return false; + } + return true; +} + bool GetMutableOptionsFromStrings( const MutableCFOptions& base_options, const std::unordered_map& options_map, @@ -160,6 +171,7 @@ bool GetMutableOptionsFromStrings( for (const auto& o : options_map) { if (ParseMemtableOptions(o.first, o.second, new_options)) { } else if (ParseCompactionOptions(o.first, o.second, new_options)) { + } else if (ParseMiscOptions(o.first, o.second, new_options)) { } else { return false; } @@ -228,6 +240,7 @@ bool GetColumnFamilyOptionsFromMap( try { if (ParseMemtableOptions(o.first, o.second, new_options)) { } else if (ParseCompactionOptions(o.first, o.second, new_options)) { + } else if (ParseMiscOptions(o.first, o.second, new_options)) { } else if (o.first == "min_write_buffer_number_to_merge") { new_options->min_write_buffer_number_to_merge = ParseInt(o.second); } else if (o.first == "compression") { @@ -286,8 +299,6 @@ bool GetColumnFamilyOptionsFromMap( } else if (o.first == "compaction_options_fifo") { new_options->compaction_options_fifo.max_table_files_size = ParseUint64(o.second); - } else if (o.first == "max_sequential_skip_in_iterations") { - new_options->max_sequential_skip_in_iterations = ParseUint64(o.second); } else if (o.first == "inplace_update_support") { new_options->inplace_update_support = ParseBoolean(o.first, o.second); } else if (o.first == "inplace_update_num_locks") {