mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 04:41:49 +00:00
82b37a18bd
Summary: replace the super version acquisision in tailing itrator with thread local Test Plan: will post results Reviewers: igor, haobo, sdong, yhchiang, dhruba Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D17757
220 lines
6.1 KiB
C++
220 lines
6.1 KiB
C++
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
|
|
#include "db/tailing_iter.h"
|
|
|
|
#include <string>
|
|
#include <utility>
|
|
#include <vector>
|
|
#include "db/db_impl.h"
|
|
#include "db/db_iter.h"
|
|
#include "db/column_family.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/slice.h"
|
|
#include "rocksdb/slice_transform.h"
|
|
#include "table/merger.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
TailingIterator::TailingIterator(Env* const env, DBImpl* db,
|
|
const ReadOptions& read_options, ColumnFamilyData* cfd)
|
|
: env_(env),
|
|
db_(db),
|
|
read_options_(read_options),
|
|
cfd_(cfd),
|
|
super_version_(nullptr),
|
|
current_(nullptr),
|
|
status_(Status::InvalidArgument("Seek() not called on this iterator")) {}
|
|
|
|
TailingIterator::~TailingIterator() {
|
|
Cleanup();
|
|
}
|
|
|
|
bool TailingIterator::Valid() const {
|
|
return current_ != nullptr;
|
|
}
|
|
|
|
void TailingIterator::SeekToFirst() {
|
|
if (!IsCurrentVersion()) {
|
|
CreateIterators();
|
|
}
|
|
|
|
mutable_->SeekToFirst();
|
|
immutable_->SeekToFirst();
|
|
UpdateCurrent();
|
|
}
|
|
|
|
void TailingIterator::Seek(const Slice& target) {
|
|
if (!IsCurrentVersion()) {
|
|
CreateIterators();
|
|
}
|
|
|
|
mutable_->Seek(target);
|
|
|
|
// We maintain the interval (prev_key_, immutable_->key()] such that there
|
|
// are no records with keys within that range in immutable_ other than
|
|
// immutable_->key(). Since immutable_ can't change in this version, we don't
|
|
// need to do a seek if 'target' belongs to that interval (i.e. immutable_ is
|
|
// already at the correct position)!
|
|
//
|
|
// If options.prefix_seek is used and immutable_ is not valid, seek if target
|
|
// has a different prefix than prev_key.
|
|
//
|
|
// prev_key_ is updated by Next(). SeekImmutable() sets prev_key_ to
|
|
// 'target' -- in this case, prev_key_ is included in the interval, so
|
|
// prev_inclusive_ has to be set.
|
|
|
|
const Comparator* cmp = cfd_->user_comparator();
|
|
if (!is_prev_set_ || cmp->Compare(prev_key_, target) >= !is_prev_inclusive_ ||
|
|
(immutable_->Valid() && cmp->Compare(target, immutable_->key()) > 0) ||
|
|
(read_options_.prefix_seek && !IsSamePrefix(target))) {
|
|
SeekImmutable(target);
|
|
}
|
|
|
|
UpdateCurrent();
|
|
}
|
|
|
|
void TailingIterator::Next() {
|
|
assert(Valid());
|
|
|
|
if (!IsCurrentVersion()) {
|
|
// save the current key, create new iterators and then seek
|
|
std::string current_key = key().ToString();
|
|
Slice key_slice(current_key.data(), current_key.size());
|
|
|
|
CreateIterators();
|
|
Seek(key_slice);
|
|
|
|
if (!Valid() || key().compare(key_slice) != 0) {
|
|
// record with current_key no longer exists
|
|
return;
|
|
}
|
|
|
|
} else if (current_ == immutable_.get()) {
|
|
// immutable iterator is advanced -- update prev_key_
|
|
prev_key_ = key().ToString();
|
|
is_prev_inclusive_ = false;
|
|
is_prev_set_ = true;
|
|
}
|
|
|
|
current_->Next();
|
|
UpdateCurrent();
|
|
}
|
|
|
|
Slice TailingIterator::key() const {
|
|
assert(Valid());
|
|
return current_->key();
|
|
}
|
|
|
|
Slice TailingIterator::value() const {
|
|
assert(Valid());
|
|
return current_->value();
|
|
}
|
|
|
|
Status TailingIterator::status() const {
|
|
if (!status_.ok()) {
|
|
return status_;
|
|
} else if (!mutable_->status().ok()) {
|
|
return mutable_->status();
|
|
} else {
|
|
return immutable_->status();
|
|
}
|
|
}
|
|
|
|
void TailingIterator::Prev() {
|
|
status_ = Status::NotSupported("This iterator doesn't support Prev()");
|
|
}
|
|
|
|
void TailingIterator::SeekToLast() {
|
|
status_ = Status::NotSupported("This iterator doesn't support SeekToLast()");
|
|
}
|
|
|
|
void TailingIterator::Cleanup() {
|
|
// Release old super version if necessary
|
|
mutable_.reset();
|
|
immutable_.reset();
|
|
if (super_version_ != nullptr && super_version_->Unref()) {
|
|
DBImpl::DeletionState deletion_state;
|
|
db_->mutex_.Lock();
|
|
super_version_->Cleanup();
|
|
db_->FindObsoleteFiles(deletion_state, false, true);
|
|
db_->mutex_.Unlock();
|
|
delete super_version_;
|
|
if (deletion_state.HaveSomethingToDelete()) {
|
|
db_->PurgeObsoleteFiles(deletion_state);
|
|
}
|
|
}
|
|
}
|
|
|
|
void TailingIterator::CreateIterators() {
|
|
Cleanup();
|
|
super_version_= cfd_->GetReferencedSuperVersion(&(db_->mutex_));
|
|
|
|
Iterator* mutable_iter = super_version_->mem->NewIterator(read_options_);
|
|
// create a DBIter that only uses memtable content; see NewIterator()
|
|
mutable_.reset(
|
|
NewDBIterator(env_, *cfd_->options(), cfd_->user_comparator(),
|
|
mutable_iter, kMaxSequenceNumber));
|
|
|
|
std::vector<Iterator*> list;
|
|
super_version_->imm->AddIterators(read_options_, &list);
|
|
super_version_->current->AddIterators(
|
|
read_options_, *cfd_->soptions(), &list);
|
|
Iterator* immutable_iter =
|
|
NewMergingIterator(&cfd_->internal_comparator(), &list[0], list.size());
|
|
|
|
// create a DBIter that only uses memtable content; see NewIterator()
|
|
immutable_.reset(
|
|
NewDBIterator(env_, *cfd_->options(), cfd_->user_comparator(),
|
|
immutable_iter, kMaxSequenceNumber));
|
|
|
|
current_ = nullptr;
|
|
is_prev_set_ = false;
|
|
}
|
|
|
|
void TailingIterator::UpdateCurrent() {
|
|
current_ = nullptr;
|
|
|
|
if (mutable_->Valid()) {
|
|
current_ = mutable_.get();
|
|
}
|
|
const Comparator* cmp = cfd_->user_comparator();
|
|
if (immutable_->Valid() &&
|
|
(current_ == nullptr ||
|
|
cmp->Compare(immutable_->key(), current_->key()) < 0)) {
|
|
current_ = immutable_.get();
|
|
}
|
|
|
|
if (!status_.ok()) {
|
|
// reset status that was set by Prev() or SeekToLast()
|
|
status_ = Status::OK();
|
|
}
|
|
}
|
|
|
|
bool TailingIterator::IsCurrentVersion() const {
|
|
return super_version_ != nullptr &&
|
|
super_version_->version_number == cfd_->GetSuperVersionNumber();
|
|
}
|
|
|
|
bool TailingIterator::IsSamePrefix(const Slice& target) const {
|
|
const SliceTransform* extractor = cfd_->options()->prefix_extractor.get();
|
|
|
|
assert(extractor);
|
|
assert(is_prev_set_);
|
|
|
|
return extractor->Transform(target)
|
|
.compare(extractor->Transform(prev_key_)) == 0;
|
|
}
|
|
|
|
void TailingIterator::SeekImmutable(const Slice& target) {
|
|
prev_key_ = target.ToString();
|
|
is_prev_inclusive_ = true;
|
|
is_prev_set_ = true;
|
|
|
|
immutable_->Seek(target);
|
|
}
|
|
|
|
} // namespace rocksdb
|