diff --git a/db/db_iter.cc b/db/db_iter.cc index d74162c829..cd0073a047 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -73,6 +73,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options, cfd_(cfd), start_seqnum_(read_options.iter_start_seqnum), timestamp_ub_(read_options.timestamp), + timestamp_lb_(read_options.iter_start_ts), timestamp_size_(timestamp_ub_ ? timestamp_ub_->size() : 0) { RecordTick(statistics_, NO_ITERATOR_CREATED); if (pin_thru_lifetime_) { @@ -246,23 +247,22 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, assert(ikey_.user_key.size() >= timestamp_size_); Slice ts; + bool more_recent = false; if (timestamp_size_ > 0) { ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_); } - if (IsVisible(ikey_.sequence, ts)) { + if (IsVisible(ikey_.sequence, ts, &more_recent)) { // If the previous entry is of seqnum 0, the current entry will not // possibly be skipped. This condition can potentially be relaxed to // prev_key.seq <= ikey_.sequence. We are cautious because it will be more // prone to bugs causing the same user key with the same sequence number. if (!is_prev_key_seqnum_zero && skipping_saved_key && - user_comparator_.CompareWithoutTimestamp( - ikey_.user_key, saved_key_.GetUserKey()) <= 0) { + CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) <= 0) { num_skipped++; // skip this entry PERF_COUNTER_ADD(internal_key_skipped_count, 1); } else { assert(!skipping_saved_key || - user_comparator_.CompareWithoutTimestamp( - ikey_.user_key, saved_key_.GetUserKey()) > 0); + CompareKeyForSkip(ikey_.user_key, saved_key_.GetUserKey()) > 0); num_skipped = 0; reseek_done = false; switch (ikey_.type) { @@ -363,11 +363,13 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, } } } else { - PERF_COUNTER_ADD(internal_recent_skipped_count, 1); + if (more_recent) { + PERF_COUNTER_ADD(internal_recent_skipped_count, 1); + } - // This key was inserted after our snapshot was taken. - // If this happens too many times in a row for the same user key, we want - // to seek to the target sequence number. + // This key was inserted after our snapshot was taken or skipped by + // timestamp range. If this happens too many times in a row for the same + // user key, we want to seek to the target sequence number. int cmp = user_comparator_.CompareWithoutTimestamp( ikey_.user_key, saved_key_.GetUserKey()); if (cmp == 0 || (skipping_saved_key && cmp < 0)) { @@ -1101,20 +1103,24 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) { return false; } -bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts) { +bool DBIter::IsVisible(SequenceNumber sequence, const Slice& ts, + bool* more_recent) { // Remember that comparator orders preceding timestamp as larger. - int cmp_ts = timestamp_ub_ != nullptr - ? user_comparator_.CompareTimestamp(ts, *timestamp_ub_) - : 0; - if (cmp_ts > 0) { - return false; - } - if (read_callback_ == nullptr) { - return sequence <= sequence_; - } else { - // TODO(yanqin): support timestamp in read_callback_. - return read_callback_->IsVisible(sequence); + // TODO(yanqin): support timestamp in read_callback_. + bool visible_by_seq = (read_callback_ == nullptr) + ? sequence <= sequence_ + : read_callback_->IsVisible(sequence); + + bool visible_by_ts = + (timestamp_ub_ == nullptr || + user_comparator_.CompareTimestamp(ts, *timestamp_ub_) <= 0) && + (timestamp_lb_ == nullptr || + user_comparator_.CompareTimestamp(ts, *timestamp_lb_) >= 0); + + if (more_recent) { + *more_recent = !visible_by_seq; } + return visible_by_seq && visible_by_ts; } void DBIter::SetSavedKeyToSeekTarget(const Slice& target) { diff --git a/db/db_iter.h b/db/db_iter.h index c2f545c4f7..ed843952b6 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -231,7 +231,8 @@ class DBIter final : public Iterator { // entry can be found within the prefix. void PrevInternal(const Slice* prefix); bool TooManyInternalKeysSkipped(bool increment = true); - bool IsVisible(SequenceNumber sequence, const Slice& ts); + bool IsVisible(SequenceNumber sequence, const Slice& ts, + bool* more_recent = nullptr); // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // is called @@ -270,6 +271,15 @@ class DBIter final : public Iterator { return expect_total_order_inner_iter_; } + // If lower bound of timestamp is given by ReadOptions.iter_start_ts, we need + // to return versions of the same key. We cannot just skip if the key value + // is the same but timestamps are different but fall in timestamp range. + inline int CompareKeyForSkip(const Slice& a, const Slice& b) { + return timestamp_lb_ != nullptr + ? user_comparator_.Compare(a, b) + : user_comparator_.CompareWithoutTimestamp(a, b); + } + const SliceTransform* prefix_extractor_; Env* const env_; Logger* logger_; @@ -338,6 +348,7 @@ class DBIter final : public Iterator { // if this value > 0 iterator will return internal keys SequenceNumber start_seqnum_; const Slice* const timestamp_ub_; + const Slice* const timestamp_lb_; const size_t timestamp_size_; }; diff --git a/db/db_with_timestamp_basic_test.cc b/db/db_with_timestamp_basic_test.cc index 900582226c..f5bc19a4e6 100644 --- a/db/db_with_timestamp_basic_test.cc +++ b/db/db_with_timestamp_basic_test.cc @@ -235,6 +235,57 @@ TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) { Close(); } +TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterateLowerTsBound) { + const int kNumKeysPerFile = 128; + const uint64_t kMaxKey = 1024; + Options options = CurrentOptions(); + options.env = env_; + options.disable_auto_compactions = true; + options.create_if_missing = true; + const size_t kTimestampSize = Timestamp(0, 0).size(); + TestComparator test_cmp(kTimestampSize); + options.comparator = &test_cmp; + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + DestroyAndReopen(options); + const std::vector write_timestamps = {Timestamp(1, 0), + Timestamp(3, 0)}; + const std::vector read_timestamps = {Timestamp(2, 0), + Timestamp(4, 0)}; + const std::vector read_timestamps_lb = {Timestamp(1, 0), + Timestamp(1, 0)}; + for (size_t i = 0; i < write_timestamps.size(); ++i) { + WriteOptions write_opts; + Slice write_ts = write_timestamps[i]; + write_opts.timestamp = &write_ts; + for (uint64_t key = 0; key <= kMaxKey; ++key) { + Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i)); + ASSERT_OK(s); + } + } + for (size_t i = 0; i < read_timestamps.size(); ++i) { + ReadOptions read_opts; + Slice read_ts = read_timestamps[i]; + Slice read_ts_lb = read_timestamps_lb[i]; + read_opts.timestamp = &read_ts; + read_opts.iter_start_ts = &read_ts_lb; + std::unique_ptr it(db_->NewIterator(read_opts)); + int count = 0; + uint64_t key = 0; + for (it->Seek(Key1(0)), key = 0; it->Valid(); it->Next(), ++count, ++key) { + CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i), + write_timestamps[i]); + if (i > 0) { + it->Next(); + CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i - 1), + write_timestamps[i - 1]); + } + } + size_t expected_count = kMaxKey + 1; + ASSERT_EQ(expected_count, count); + } + Close(); +} + TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) { const int kNumKeysPerFile = 128; const uint64_t kMaxKey = 0xffffffffffffffff; @@ -584,7 +635,7 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { std::vector read_ts_list; const auto& verify_records_func = [&](size_t i, size_t begin, size_t end, - ColumnFamilyHandle* cfh) { + ColumnFamilyHandle* cfh) { std::string value; std::string timestamp; @@ -622,9 +673,9 @@ TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) { // higherlevel[0].largest.userkey ASSERT_OK(Flush(cf)); - // compact files (2 at each level) to a lower level such that all keys - // with the same timestamp is at one level, with newer versions at - // higher levels. + // compact files (2 at each level) to a lower level such that all + // keys with the same timestamp is at one level, with newer versions + // at higher levels. CompactionOptions compact_opt; compact_opt.compression = kNoCompression; db_->CompactFiles(compact_opt, handles_[cf], diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index dbc68b7cfd..1806536fb4 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1337,9 +1337,14 @@ struct ReadOptions { // specified timestamp. All timestamps of the same database must be of the // same length and format. The user is responsible for providing a customized // compare function via Comparator to order tuples. + // For iterator, iter_start_ts is the lower bound (older) and timestamp + // serves as the upper bound. Versions of the same record that fall in + // the timestamp range will be returned. If iter_start_ts is nullptr, + // only the most recent version visible to timestamp is returned. // The user-specified timestamp feature is still under active development, // and the API is subject to change. const Slice* timestamp; + const Slice* iter_start_ts; ReadOptions(); ReadOptions(bool cksum, bool cache); diff --git a/options/options.cc b/options/options.cc index 3a611af231..5841eb2e9b 100644 --- a/options/options.cc +++ b/options/options.cc @@ -607,7 +607,8 @@ ReadOptions::ReadOptions() background_purge_on_iterator_cleanup(false), ignore_range_deletions(false), iter_start_seqnum(0), - timestamp(nullptr) {} + timestamp(nullptr), + iter_start_ts(nullptr) {} ReadOptions::ReadOptions(bool cksum, bool cache) : snapshot(nullptr), @@ -627,6 +628,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) background_purge_on_iterator_cleanup(false), ignore_range_deletions(false), iter_start_seqnum(0), - timestamp(nullptr) {} + timestamp(nullptr), + iter_start_ts(nullptr) {} } // namespace ROCKSDB_NAMESPACE