rocksdb/db/arena_wrapped_db_iter.cc
leipeng 68ce5d84f6 Add new Iterator API Refresh(const snapshot*) (#10594)
Summary:
This PR resolves https://github.com/facebook/rocksdb/issues/10487 & https://github.com/facebook/rocksdb/issues/10536, user code needs to call Refresh() periodically.

The main code change is to support range deletions. A range tombstone iterator uses a sequence number as upper bound to decide which range tombstones are effective. During Iterator refresh, this sequence number upper bound needs to be updated for all range tombstone iterators under DBIter and LevelIterator. LevelIterator may create new table iterators and range tombstone iterator during scanning, so it needs to be aware of iterator refresh. The code path that propagates this change is `db_iter_->set_sequence(read_seq)  -> MergingIterator::SetRangeDelReadSeqno() -> TruncatedRangeDelIterator::SetRangeDelReadSeqno() and LevelIterator::SetRangeDelReadSeqno()`.

This change also fixes an issue where range tombstone iterators created by LevelIterator may access ReadOptions::snapshot, even though we do not explicitly require users to keep a snapshot alive after creating an Iterator.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10594

Test Plan:
* New unit tests.
* Add Iterator::Refresh(snapshot) to stress test. Note that this change only adds tests for refreshing to the same snapshot since this is the main target use case.

TODO in a following PR:
* Stress test Iterator::Refresh() to different snapshots or no snapshot.

Reviewed By: ajkr

Differential Revision: D48456896

Pulled By: cbi42

fbshipit-source-id: 2e642c04e91235cc9542ef4cd37b3c20823bd779
2023-09-15 10:44:43 -07:00

179 lines
7.3 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/arena_wrapped_db_iter.h"
#include "memory/arena.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "table/internal_iterator.h"
#include "table/iterator_wrapper.h"
#include "util/user_comparator_wrapper.h"
namespace ROCKSDB_NAMESPACE {
inline static SequenceNumber GetSeqNum(const DBImpl* db, const Snapshot* s) {
if (s) {
return s->GetSequenceNumber();
} else {
return db->GetLatestSequenceNumber();
}
}
Status ArenaWrappedDBIter::GetProperty(std::string prop_name,
std::string* prop) {
if (prop_name == "rocksdb.iterator.super-version-number") {
// First try to pass the value returned from inner iterator.
if (!db_iter_->GetProperty(prop_name, prop).ok()) {
*prop = std::to_string(sv_number_);
}
return Status::OK();
}
return db_iter_->GetProperty(prop_name, prop);
}
void ArenaWrappedDBIter::Init(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iteration,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
auto mem = arena_.AllocateAligned(sizeof(DBIter));
db_iter_ =
new (mem) DBIter(env, read_options, ioptions, mutable_cf_options,
ioptions.user_comparator, /* iter */ nullptr, version,
sequence, true, max_sequential_skip_in_iteration,
read_callback, db_impl, cfd, expose_blob_index);
sv_number_ = version_number;
read_options_ = read_options;
allow_refresh_ = allow_refresh;
memtable_range_tombstone_iter_ = nullptr;
if (!CheckFSFeatureSupport(env->GetFileSystem().get(),
FSSupportedOps::kAsyncIO)) {
read_options_.async_io = false;
}
}
Status ArenaWrappedDBIter::Refresh() { return Refresh(nullptr); }
Status ArenaWrappedDBIter::Refresh(const Snapshot* snapshot) {
if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) {
return Status::NotSupported("Creating renew iterator is not allowed.");
}
assert(db_iter_ != nullptr);
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
// correct behavior. Will be corrected automatically when we take a snapshot
// here for the case of WritePreparedTxnDB.
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
// If we recreate a new internal iterator below (NewInternalIterator()),
// we will pass in read_options_. We need to make sure it
// has the right snapshot.
read_options_.snapshot = snapshot;
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
auto reinit_internal_iter = [&]() {
Env* env = db_iter_->env();
db_iter_->~DBIter();
arena_.~Arena();
new (&arena_) Arena();
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
if (read_callback_) {
read_callback_->Refresh(read_seq);
}
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
sv->current, read_seq,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
allow_refresh_);
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
read_options_, cfd_, sv, &arena_, read_seq,
/* allow_unprepared_value */ true, /* db_iter */ this);
SetIterUnderDBIter(internal_iter);
};
while (true) {
if (sv_number_ != cur_sv_number) {
reinit_internal_iter();
break;
} else {
SequenceNumber read_seq = GetSeqNum(db_impl_, snapshot);
// Refresh range-tombstones in MemTable
if (!read_options_.ignore_range_deletions) {
SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_);
TEST_SYNC_POINT_CALLBACK("ArenaWrappedDBIter::Refresh:SV", nullptr);
auto t = sv->mem->NewRangeTombstoneIterator(
read_options_, read_seq, false /* immutable_memtable */);
if (!t || t->empty()) {
// If memtable_range_tombstone_iter_ points to a non-empty tombstone
// iterator, then it means sv->mem is not the memtable that
// memtable_range_tombstone_iter_ points to, so SV must have changed
// after the sv_number_ != cur_sv_number check above. We will fall
// back to re-init the InternalIterator, and the tombstone iterator
// will be freed during db_iter destruction there.
if (memtable_range_tombstone_iter_) {
assert(!*memtable_range_tombstone_iter_ ||
sv_number_ != cfd_->GetSuperVersionNumber());
}
delete t;
} else { // current mutable memtable has range tombstones
if (!memtable_range_tombstone_iter_) {
delete t;
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
// The memtable under DBIter did not have range tombstone before
// refresh.
reinit_internal_iter();
break;
} else {
delete *memtable_range_tombstone_iter_;
*memtable_range_tombstone_iter_ = new TruncatedRangeDelIterator(
std::unique_ptr<FragmentedRangeTombstoneIterator>(t),
&cfd_->internal_comparator(), nullptr, nullptr);
}
}
db_impl_->ReturnAndCleanupSuperVersion(cfd_, sv);
}
// Check again if the latest super version number is changed
uint64_t latest_sv_number = cfd_->GetSuperVersionNumber();
if (latest_sv_number != cur_sv_number) {
// If the super version number is changed after refreshing,
// fallback to Re-Init the InternalIterator
cur_sv_number = latest_sv_number;
continue;
}
db_iter_->set_sequence(read_seq);
db_iter_->set_valid(false);
break;
}
}
return Status::OK();
}
ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const ReadOptions& read_options, const ImmutableOptions& ioptions,
const MutableCFOptions& mutable_cf_options, const Version* version,
const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations,
uint64_t version_number, ReadCallback* read_callback, DBImpl* db_impl,
ColumnFamilyData* cfd, bool expose_blob_index, bool allow_refresh) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
iter->Init(env, read_options, ioptions, mutable_cf_options, version, sequence,
max_sequential_skip_in_iterations, version_number, read_callback,
db_impl, cfd, expose_blob_index, allow_refresh);
if (db_impl != nullptr && cfd != nullptr && allow_refresh) {
iter->StoreRefreshInfo(db_impl, cfd, read_callback, expose_blob_index);
}
return iter;
}
} // namespace ROCKSDB_NAMESPACE