diff --git a/db/column_family.cc b/db/column_family.cc index 0923ff4484..73e695beca 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -477,13 +477,16 @@ void SuperVersion::Cleanup() { cfd->UnrefAndTryDelete(); } -void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem, - MemTableListVersion* new_imm, Version* new_current) { +void SuperVersion::Init( + ColumnFamilyData* new_cfd, MemTable* new_mem, MemTableListVersion* new_imm, + Version* new_current, + std::shared_ptr new_seqno_to_time_mapping) { cfd = new_cfd; mem = new_mem; imm = new_imm; current = new_current; full_history_ts_low = cfd->GetFullHistoryTsLow(); + seqno_to_time_mapping = std::move(new_seqno_to_time_mapping); cfd->Ref(); mem->Ref(); imm->Ref(); @@ -1196,9 +1199,10 @@ Status ColumnFamilyData::RangesOverlapWithMemtables( ReadOptions read_opts; read_opts.total_order_seek = true; MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena); - merge_iter_builder.AddIterator( - super_version->mem->NewIterator(read_opts, &arena)); - super_version->imm->AddIterators(read_opts, &merge_iter_builder, + merge_iter_builder.AddIterator(super_version->mem->NewIterator( + read_opts, /*seqno_to_time_mapping=*/nullptr, &arena)); + super_version->imm->AddIterators(read_opts, /*seqno_to_time_mapping=*/nullptr, + &merge_iter_builder, false /* add_range_tombstone_iter */); ScopedArenaIterator memtable_iter(merge_iter_builder.Finish()); @@ -1336,7 +1340,12 @@ void ColumnFamilyData::InstallSuperVersion( const MutableCFOptions& mutable_cf_options) { SuperVersion* new_superversion = sv_context->new_superversion.release(); new_superversion->mutable_cf_options = mutable_cf_options; - new_superversion->Init(this, mem_, imm_.current(), current_); + new_superversion->Init(this, mem_, imm_.current(), current_, + sv_context->new_seqno_to_time_mapping + ? std::move(sv_context->new_seqno_to_time_mapping) + : super_version_ + ? super_version_->ShareSeqnoToTimeMapping() + : nullptr); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; if (old_superversion == nullptr || old_superversion->current != current() || diff --git a/db/column_family.h b/db/column_family.h index b6b0ed8d5d..e76ceb5d46 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -26,6 +26,7 @@ #include "rocksdb/env.h" #include "rocksdb/options.h" #include "trace_replay/block_cache_tracer.h" +#include "util/cast_util.h" #include "util/hash_containers.h" #include "util/thread_local.h" @@ -219,6 +220,9 @@ struct SuperVersion { // enable UDT feature, this is an empty string. std::string full_history_ts_low; + // A shared copy of the DB's seqno to time mapping. + std::shared_ptr seqno_to_time_mapping{nullptr}; + // should be called outside the mutex SuperVersion() = default; ~SuperVersion(); @@ -232,8 +236,23 @@ struct SuperVersion { // that needs to be deleted in to_delete vector. Unrefing those // objects needs to be done in the mutex void Cleanup(); - void Init(ColumnFamilyData* new_cfd, MemTable* new_mem, - MemTableListVersion* new_imm, Version* new_current); + void Init( + ColumnFamilyData* new_cfd, MemTable* new_mem, + MemTableListVersion* new_imm, Version* new_current, + std::shared_ptr new_seqno_to_time_mapping); + + // Share the ownership of the seqno to time mapping object referred to in this + // SuperVersion. To be used by the new SuperVersion to be installed after this + // one if seqno to time mapping does not change in between these two + // SuperVersions. + std::shared_ptr ShareSeqnoToTimeMapping() { + return seqno_to_time_mapping; + } + + // Access the seqno to time mapping object in this SuperVersion. + UnownedPtr GetSeqnoToTimeMapping() const { + return seqno_to_time_mapping.get(); + } // The value of dummy is not actually used. kSVInUse takes its address as a // mark in the thread local storage to indicate the SuperVersion is in use diff --git a/db/compaction/tiered_compaction_test.cc b/db/compaction/tiered_compaction_test.cc index cd4b649487..3fe800c43c 100644 --- a/db/compaction/tiered_compaction_test.cc +++ b/db/compaction/tiered_compaction_test.cc @@ -14,6 +14,7 @@ #include "rocksdb/listener.h" #include "rocksdb/utilities/debug.h" #include "test_util/mock_time_env.h" +#include "utilities/merge_operators.h" namespace ROCKSDB_NAMESPACE { @@ -1307,8 +1308,8 @@ TEST_F(TieredCompactionTest, CheckInternalKeyRange) { class PrecludeLastLevelTest : public DBTestBase { public: - PrecludeLastLevelTest() - : DBTestBase("preclude_last_level_test", /*env_do_fsync=*/false) { + PrecludeLastLevelTest(std::string test_name = "preclude_last_level_test") + : DBTestBase(test_name, /*env_do_fsync=*/false) { mock_clock_ = std::make_shared(env_->GetSystemClock()); mock_clock_->SetCurrentTime(kMockStartTime); mock_env_ = std::make_unique(env_, mock_clock_); @@ -2256,6 +2257,253 @@ TEST_F(PrecludeLastLevelTest, RangeDelsCauseFileEndpointsToOverlap) { Close(); } +// Tests DBIter::GetProperty("rocksdb.iterator.write-time") return a data's +// approximate write unix time. +// Test Param: +// 1) use tailing iterator or regular iterator (when it applies) +class IteratorWriteTimeTest : public PrecludeLastLevelTest, + public testing::WithParamInterface { + public: + IteratorWriteTimeTest() : PrecludeLastLevelTest("iterator_write_time_test") {} + + uint64_t VerifyKeyAndGetWriteTime(Iterator* iter, + const std::string& expected_key) { + std::string prop; + uint64_t write_time = 0; + EXPECT_TRUE(iter->Valid()); + EXPECT_EQ(expected_key, iter->key()); + EXPECT_OK(iter->GetProperty("rocksdb.iterator.write-time", &prop)); + Slice prop_slice = prop; + EXPECT_TRUE(GetFixed64(&prop_slice, &write_time)); + return write_time; + } + + void VerifyKeyAndWriteTime(Iterator* iter, const std::string& expected_key, + uint64_t expected_write_time) { + std::string prop; + uint64_t write_time = 0; + EXPECT_TRUE(iter->Valid()); + EXPECT_EQ(expected_key, iter->key()); + EXPECT_OK(iter->GetProperty("rocksdb.iterator.write-time", &prop)); + Slice prop_slice = prop; + EXPECT_TRUE(GetFixed64(&prop_slice, &write_time)); + EXPECT_EQ(expected_write_time, write_time); + } +}; + +TEST_P(IteratorWriteTimeTest, ReadFromMemtables) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kSecondsPerRecording = 101; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.preserve_internal_time_seconds = 10000; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < kNumKeys; i++) { + dbfull()->TEST_WaitForPeriodicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); }); + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + } + + ReadOptions ropts; + ropts.tailing = GetParam(); + int i; + + // Forward iteration + uint64_t start_time = 0; + { + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) { + if (start_time == 0) { + start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i)); + } else { + VerifyKeyAndWriteTime(iter.get(), Key(i), + start_time + kSecondsPerRecording * (i + 1)); + } + } + ASSERT_OK(iter->status()); + } + + // Backward iteration + { + ropts.tailing = false; + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + for (iter->SeekToLast(), i = kNumKeys - 1; iter->Valid(); + iter->Prev(), i--) { + if (i == 0) { + VerifyKeyAndWriteTime(iter.get(), Key(i), start_time); + } else { + VerifyKeyAndWriteTime(iter.get(), Key(i), + start_time + kSecondsPerRecording * (i + 1)); + } + } + ASSERT_OK(iter->status()); + } + Close(); +} + +TEST_P(IteratorWriteTimeTest, ReadFromSstFile) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kSecondsPerRecording = 101; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.preserve_internal_time_seconds = 10000; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < kNumKeys; i++) { + dbfull()->TEST_WaitForPeriodicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); }); + ASSERT_OK(Put(Key(i), rnd.RandomString(100))); + } + + ASSERT_OK(Flush()); + ReadOptions ropts; + ropts.tailing = GetParam(); + std::string prop; + int i; + + // Forward iteration + uint64_t start_time = 0; + { + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + for (iter->SeekToFirst(), i = 0; iter->Valid(); iter->Next(), i++) { + if (start_time == 0) { + start_time = VerifyKeyAndGetWriteTime(iter.get(), Key(i)); + } else { + VerifyKeyAndWriteTime(iter.get(), Key(i), + start_time + kSecondsPerRecording * (i + 1)); + } + } + ASSERT_OK(iter->status()); + } + + // Backward iteration + { + ropts.tailing = false; + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + for (iter->SeekToLast(), i = kNumKeys - 1; iter->Valid(); + iter->Prev(), i--) { + if (i == 0) { + VerifyKeyAndWriteTime(iter.get(), Key(i), start_time); + } else { + VerifyKeyAndWriteTime(iter.get(), Key(i), + start_time + kSecondsPerRecording * (i + 1)); + } + } + ASSERT_OK(iter->status()); + } + + // Reopen the DB and disable the seqno to time recording. Data retrieved from + // SST files still have write time available. + options.preserve_internal_time_seconds = 0; + DestroyAndReopen(options); + + dbfull()->TEST_WaitForPeriodicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); }); + ASSERT_OK(Put("a", "val")); + ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty()); + + { + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + // "a" is retrieved from memtable, its write time is unknown because the + // seqno to time mapping recording is not available. + VerifyKeyAndWriteTime(iter.get(), "a", + std::numeric_limits::max()); + for (iter->Next(), i = 0; iter->Valid(); iter->Next(), i++) { + if (i == 0) { + VerifyKeyAndWriteTime(iter.get(), Key(i), start_time); + } else { + VerifyKeyAndWriteTime(iter.get(), Key(i), + start_time + kSecondsPerRecording * (i + 1)); + } + } + ASSERT_OK(iter->status()); + } + + // There is no write time info for "a" after it's flushed to SST file either. + ASSERT_OK(Flush()); + { + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + VerifyKeyAndWriteTime(iter.get(), "a", + std::numeric_limits::max()); + } + + // Sequence number zeroed out after compacted to the last level, write time + // all becomes zero. + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + { + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + iter->SeekToFirst(); + for (iter->Next(), i = 0; iter->Valid(); iter->Next(), i++) { + VerifyKeyAndWriteTime(iter.get(), Key(i), 0); + } + ASSERT_OK(iter->status()); + } + Close(); +} + +TEST_P(IteratorWriteTimeTest, MergeReturnsBaseValueWriteTime) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kSecondsPerRecording = 101; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.preserve_internal_time_seconds = 10000; + options.num_levels = kNumLevels; + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + DestroyAndReopen(options); + + dbfull()->TEST_WaitForPeriodicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); }); + ASSERT_OK(Put("foo", "fv1")); + + dbfull()->TEST_WaitForPeriodicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(kSecondsPerRecording); }); + ASSERT_OK(Put("bar", "bv1")); + ASSERT_OK(Merge("foo", "bv1")); + + ReadOptions ropts; + ropts.tailing = GetParam(); + { + std::unique_ptr iter(dbfull()->NewIterator(ropts)); + iter->SeekToFirst(); + uint64_t bar_time = VerifyKeyAndGetWriteTime(iter.get(), "bar"); + iter->Next(); + uint64_t foo_time = VerifyKeyAndGetWriteTime(iter.get(), "foo"); + // "foo" has an older write time because its base value's write time is used + ASSERT_GT(bar_time, foo_time); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + + Close(); +} + +INSTANTIATE_TEST_CASE_P(IteratorWriteTimeTest, IteratorWriteTimeTest, + testing::Bool()); + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index ba3a75572e..ef3ce78b46 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -1979,7 +1980,8 @@ InternalIterator* DBImpl::NewInternalIterator( super_version->mutable_cf_options.prefix_extractor != nullptr, read_options.iterate_upper_bound); // Collect iterator for mutable memtable - auto mem_iter = super_version->mem->NewIterator(read_options, arena); + auto mem_iter = super_version->mem->NewIterator( + read_options, super_version->GetSeqnoToTimeMapping(), arena); Status s; if (!read_options.ignore_range_deletions) { TruncatedRangeDelIterator* mem_tombstone_iter = nullptr; @@ -2001,8 +2003,9 @@ InternalIterator* DBImpl::NewInternalIterator( // Collect all needed child iterators for immutable memtables if (s.ok()) { - super_version->imm->AddIterators(read_options, &merge_iter_builder, - !read_options.ignore_range_deletions); + super_version->imm->AddIterators( + read_options, super_version->GetSeqnoToTimeMapping(), + &merge_iter_builder, !read_options.ignore_range_deletions); } TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s); if (s.ok()) { @@ -6466,6 +6469,8 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) { immutable_db_options_.clock->GetCurrentTime(&unix_time_signed) .PermitUncheckedError(); // Ignore error uint64_t unix_time = static_cast(unix_time_signed); + + std::vector sv_contexts; if (populate_historical_seconds > 0) { bool success = true; { @@ -6476,6 +6481,7 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) { success = seqno_to_time_mapping_.PrePopulate( from_seqno, seqno, unix_time - populate_historical_seconds, unix_time); + InstallSeqnoToTimeMappingInSV(&sv_contexts); } else { // One of these will fail assert(seqno > 1); @@ -6501,7 +6507,31 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) { // FIXME: assert(seqno > 0); // Always successful assuming seqno never go backwards seqno_to_time_mapping_.Append(seqno, unix_time); + InstallSeqnoToTimeMappingInSV(&sv_contexts); + } + + // clean up outside db mutex + for (SuperVersionContext& sv_context : sv_contexts) { + sv_context.Clean(); } } +void DBImpl::InstallSeqnoToTimeMappingInSV( + std::vector* sv_contexts) { + mutex_.AssertHeld(); + std::shared_ptr new_seqno_to_time_mapping = + std::make_shared(); + new_seqno_to_time_mapping->CopyFrom(seqno_to_time_mapping_); + for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + sv_contexts->emplace_back(/*create_superversion=*/true); + sv_contexts->back().new_seqno_to_time_mapping = new_seqno_to_time_mapping; + cfd->InstallSuperVersion(&sv_contexts->back(), + *(cfd->GetLatestMutableCFOptions())); + } + bg_cv_.SignalAll(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 29264474bd..a7181c9e0f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1226,6 +1226,22 @@ class DBImpl : public DB { // populate_historical_seconds, now]. void RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds); + // Everytime DB's seqno to time mapping changed (which already hold the db + // mutex), we install a new SuperVersion in each column family with a shared + // copy of the new mapping while holding the db mutex. + // This is done for all column families even though the column family does not + // explicitly enabled the + // `preclude_last_level_data_seconds` or `preserve_internal_time_seconds` + // features. + // This mapping supports iterators to fulfill the + // "rocksdb.iterator.write-time" iterator property for entries in memtables. + // + // Since this new SuperVersion doesn't involve an LSM tree shape change, we + // don't schedule work after installing this SuperVersion. It returns the used + // `SuperVersionContext` for clean up after release mutex. + void InstallSeqnoToTimeMappingInSV( + std::vector* sv_contexts); + // Interface to block and signal the DB in case of stalling writes by // WriteBufferManager. Each DBImpl object contains ptr to WBMStallInterface. // When DB needs to be blocked or signalled by WriteBufferManager, diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 35af4dcbef..7990561757 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1630,7 +1630,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, Status s; TableProperties table_properties; { - ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); + ScopedArenaIterator iter( + mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena)); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" " Level-0 table #%" PRIu64 ": started", diff --git a/db/db_iter.cc b/db/db_iter.cc index 90e19e95d2..fd80287762 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -113,8 +113,8 @@ Status DBIter::GetProperty(std::string prop_name, std::string* prop) { *prop = saved_key_.GetUserKey().ToString(); return Status::OK(); } else if (prop_name == "rocksdb.iterator.write-time") { - // TODO(yuzhangyu): implement return the actual write time. - return Status::NotSupported("write time property is under construction"); + PutFixed64(prop, saved_write_unix_time_); + return Status::OK(); } return Status::InvalidArgument("Unidentified property."); } @@ -421,6 +421,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, assert(ikey_.type == kTypeValue || ikey_.type == kTypeValuePreferredSeqno); Slice value = iter_.value(); + saved_write_unix_time_ = iter_.write_unix_time(); if (ikey_.type == kTypeValuePreferredSeqno) { value = ParsePackedValueForValue(value); } @@ -582,6 +583,7 @@ bool DBIter::MergeValuesNewToOld() { if (kTypeValue == ikey.type || kTypeValuePreferredSeqno == ikey.type) { Slice value = iter_.value(); + saved_write_unix_time_ = iter_.write_unix_time(); if (kTypeValuePreferredSeqno == ikey.type) { value = ParsePackedValueForValue(value); } @@ -931,6 +933,7 @@ bool DBIter::FindValueForCurrentKey() { case kTypeBlobIndex: case kTypeWideColumnEntity: if (iter_.iter()->IsValuePinned()) { + saved_write_unix_time_ = iter_.write_unix_time(); if (last_key_entry_type == kTypeValuePreferredSeqno) { pinned_value_ = ParsePackedValueForValue(iter_.value()); } else { @@ -1162,6 +1165,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { if (ikey.type == kTypeValue || ikey.type == kTypeValuePreferredSeqno || ikey.type == kTypeBlobIndex || ikey.type == kTypeWideColumnEntity) { assert(iter_.iter()->IsValuePinned()); + saved_write_unix_time_ = iter_.write_unix_time(); if (ikey.type == kTypeValuePreferredSeqno) { pinned_value_ = ParsePackedValueForValue(iter_.value()); } else { diff --git a/db/db_iter.h b/db/db_iter.h index 1882b02b23..d3c6db4966 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -367,6 +367,12 @@ class DBIter final : public Iterator { // and should not be used across functions. Reusing this object can reduce // overhead of calling construction of the function if creating it each time. ParsedInternalKey ikey_; + + // TODO(yuzhangyu): update this documentation for kTypeValuePreferredSeqno + // types. + // The approximate write time for the entry. It is deduced from the entry's + // sequence number if the seqno to time mapping is available. + uint64_t saved_write_unix_time_; std::string saved_value_; Slice pinned_value_; // for prefix seek mode to support prev() diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index 39ef21db16..7c3bdd850f 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -142,6 +142,13 @@ TEST_P(DBIteratorTest, IteratorProperty) { // Get internal key at which the iteration stopped (tombstone in this case). ASSERT_OK(iter->GetProperty("rocksdb.iterator.internal-key", &prop_value)); ASSERT_EQ("2", prop_value); + + prop_value.clear(); + ASSERT_OK(iter->GetProperty("rocksdb.iterator.write-time", &prop_value)); + uint64_t write_time; + Slice prop_slice = prop_value; + ASSERT_TRUE(GetFixed64(&prop_slice, &write_time)); + ASSERT_EQ(std::numeric_limits::max(), write_time); } Close(); } diff --git a/db/dbformat.h b/db/dbformat.h index 5b16726693..fdde564582 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -374,6 +374,13 @@ inline ValueType ExtractValueType(const Slice& internal_key) { return static_cast(c); } +// input [internal key]: +// output: +inline SequenceNumber ExtractSequenceNumber(const Slice& internal_key) { + uint64_t num = ExtractInternalKeyFooter(internal_key); + return num >> 8; +} + // A comparator for internal keys that uses a specified comparator for // the user key portion and breaks ties by decreasing sequence number. class InternalKeyComparator diff --git a/db/flush_job.cc b/db/flush_job.cc index 9340ea18a2..78a73c12f7 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -417,7 +417,8 @@ Status FlushJob::MemPurge() { std::vector> range_del_iters; for (MemTable* m : mems_) { - memtables.push_back(m->NewIterator(ro, &arena)); + memtables.push_back( + m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena)); auto* range_del_iter = m->NewRangeTombstoneIterator( ro, kMaxSequenceNumber, true /* immutable_memtable */); if (range_del_iter != nullptr) { @@ -897,7 +898,8 @@ Status FlushJob::WriteLevel0Table() { db_options_.info_log, "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n", cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber()); - memtables.push_back(m->NewIterator(ro, &arena)); + memtables.push_back( + m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena)); auto* range_del_iter = m->NewRangeTombstoneIterator( ro, kMaxSequenceNumber, true /* immutable_memtable */); if (range_del_iter != nullptr) { diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index a302843c40..a4cbdb4667 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -611,6 +611,11 @@ Slice ForwardIterator::key() const { return current_->key(); } +uint64_t ForwardIterator::write_unix_time() const { + assert(valid_); + return current_->write_unix_time(); +} + Slice ForwardIterator::value() const { assert(valid_); return current_->value(); @@ -704,8 +709,12 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { } ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(), kMaxSequenceNumber /* upper_bound */); - mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_); - sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_); + UnownedPtr seqno_to_time_mapping = + sv_->GetSeqnoToTimeMapping(); + mutable_iter_ = + sv_->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_); + sv_->imm->AddIterators(read_options_, seqno_to_time_mapping, &imm_iters_, + &arena_); if (!read_options_.ignore_range_deletions) { std::unique_ptr range_del_iter( sv_->mem->NewRangeTombstoneIterator( @@ -769,8 +778,12 @@ void ForwardIterator::RenewIterators() { } imm_iters_.clear(); - mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_); - svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_); + UnownedPtr seqno_to_time_mapping = + svnew->GetSeqnoToTimeMapping(); + mutable_iter_ = + svnew->mem->NewIterator(read_options_, seqno_to_time_mapping, &arena_); + svnew->imm->AddIterators(read_options_, seqno_to_time_mapping, &imm_iters_, + &arena_); ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(), kMaxSequenceNumber /* upper_bound */); if (!read_options_.ignore_range_deletions) { diff --git a/db/forward_iterator.h b/db/forward_iterator.h index 71d9a85105..9f1b4379b9 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -76,6 +76,7 @@ class ForwardIterator : public InternalIterator { void Next() override; Slice key() const override; Slice value() const override; + uint64_t write_unix_time() const override; Status status() const override; bool PrepareValue() override; Status GetProperty(std::string prop_name, std::string* prop) override; diff --git a/db/job_context.h b/db/job_context.h index 48728f48d6..272b79a216 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -35,6 +35,12 @@ struct SuperVersionContext { std::unique_ptr new_superversion; // if nullptr no new superversion + // If not nullptr, a new seqno to time mapping is available to be installed. + // Otherwise, make a shared copy of the one in the existing SuperVersion and + // carry it over to the new SuperVersion. This is moved to the SuperVersion + // during installation. + std::shared_ptr new_seqno_to_time_mapping{nullptr}; + explicit SuperVersionContext(bool create_superversion = false) : new_superversion(create_superversion ? new SuperVersion() : nullptr) {} diff --git a/db/memtable.cc b/db/memtable.cc index 7ffd5dcdc4..4197dd9f47 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -364,11 +364,13 @@ const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator : public InternalIterator { public: MemTableIterator(const MemTable& mem, const ReadOptions& read_options, + UnownedPtr seqno_to_time_mapping, Arena* arena, bool use_range_del_table = false) : bloom_(nullptr), prefix_extractor_(mem.prefix_extractor_), comparator_(mem.comparator_), valid_(false), + seqno_to_time_mapping_(seqno_to_time_mapping), arena_mode_(arena != nullptr), value_pinned_( !mem.GetImmutableMemTableOptions()->inplace_update_support), @@ -499,6 +501,18 @@ class MemTableIterator : public InternalIterator { assert(Valid()); return GetLengthPrefixedSlice(iter_->key()); } + + uint64_t write_unix_time() const override { + assert(Valid()); + // TODO(yuzhangyu): if value type is kTypeValuePreferredSeqno, + // parse its unix write time out of packed value. + if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) { + return std::numeric_limits::max(); + } + SequenceNumber seqno = ExtractSequenceNumber(key()); + return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(seqno); + } + Slice value() const override { assert(Valid()); Slice key_slice = GetLengthPrefixedSlice(iter_->key()); @@ -523,6 +537,8 @@ class MemTableIterator : public InternalIterator { const MemTable::KeyComparator comparator_; MemTableRep::Iterator* iter_; bool valid_; + // The seqno to time mapping is owned by the SuperVersion. + UnownedPtr seqno_to_time_mapping_; bool arena_mode_; bool value_pinned_; uint32_t protection_bytes_per_key_; @@ -541,11 +557,13 @@ class MemTableIterator : public InternalIterator { } }; -InternalIterator* MemTable::NewIterator(const ReadOptions& read_options, - Arena* arena) { +InternalIterator* MemTable::NewIterator( + const ReadOptions& read_options, + UnownedPtr seqno_to_time_mapping, Arena* arena) { assert(arena != nullptr); auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); - return new (mem) MemTableIterator(*this, read_options, arena); + return new (mem) + MemTableIterator(*this, read_options, seqno_to_time_mapping, arena); } FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator( @@ -579,9 +597,9 @@ FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal( if (!cache->initialized.load(std::memory_order_acquire)) { cache->reader_mutex.lock(); if (!cache->tombstones) { - auto* unfragmented_iter = - new MemTableIterator(*this, read_options, nullptr /* arena */, - true /* use_range_del_table */); + auto* unfragmented_iter = new MemTableIterator( + *this, read_options, nullptr /* seqno_to_time_mapping= */, + nullptr /* arena */, true /* use_range_del_table */); cache->tombstones.reset(new FragmentedRangeTombstoneList( std::unique_ptr(unfragmented_iter), comparator_.comparator)); @@ -600,9 +618,9 @@ void MemTable::ConstructFragmentedRangeTombstones() { // There should be no concurrent Construction if (!is_range_del_table_empty_.load(std::memory_order_relaxed)) { // TODO: plumb Env::IOActivity, Env::IOPriority - auto* unfragmented_iter = - new MemTableIterator(*this, ReadOptions(), nullptr /* arena */, - true /* use_range_del_table */); + auto* unfragmented_iter = new MemTableIterator( + *this, ReadOptions(), nullptr /*seqno_to_time_mapping=*/, + nullptr /* arena */, true /* use_range_del_table */); fragmented_range_tombstone_list_ = std::make_unique( diff --git a/db/memtable.h b/db/memtable.h index 7f3db0d05b..730258f05c 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -20,6 +20,7 @@ #include "db/kv_checksum.h" #include "db/range_tombstone_fragmenter.h" #include "db/read_callback.h" +#include "db/seqno_to_time_mapping.h" #include "db/version_edit.h" #include "memory/allocator.h" #include "memory/concurrent_arena.h" @@ -28,6 +29,7 @@ #include "rocksdb/db.h" #include "rocksdb/memtablerep.h" #include "table/multiget_context.h" +#include "util/cast_util.h" #include "util/dynamic_bloom.h" #include "util/hash.h" #include "util/hash_containers.h" @@ -203,7 +205,11 @@ class MemTable { // arena: If not null, the arena needs to be used to allocate the Iterator. // Calling ~Iterator of the iterator will destroy all the states but // those allocated in arena. - InternalIterator* NewIterator(const ReadOptions& read_options, Arena* arena); + // seqno_to_time_mapping: it's used to support return write unix time for the + // data, currently only needed for iterators serving user reads. + InternalIterator* NewIterator( + const ReadOptions& read_options, + UnownedPtr seqno_to_time_mapping, Arena* arena); // Returns an iterator that yields the range tombstones of the memtable. // The caller must ensure that the underlying MemTable remains live diff --git a/db/memtable_list.cc b/db/memtable_list.cc index a65d3914b6..ffa9de111f 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -211,18 +211,22 @@ Status MemTableListVersion::AddRangeTombstoneIterators( } void MemTableListVersion::AddIterators( - const ReadOptions& options, std::vector* iterator_list, - Arena* arena) { + const ReadOptions& options, + UnownedPtr seqno_to_time_mapping, + std::vector* iterator_list, Arena* arena) { for (auto& m : memlist_) { - iterator_list->push_back(m->NewIterator(options, arena)); + iterator_list->push_back( + m->NewIterator(options, seqno_to_time_mapping, arena)); } } -void MemTableListVersion::AddIterators(const ReadOptions& options, - MergeIteratorBuilder* merge_iter_builder, - bool add_range_tombstone_iter) { +void MemTableListVersion::AddIterators( + const ReadOptions& options, + UnownedPtr seqno_to_time_mapping, + MergeIteratorBuilder* merge_iter_builder, bool add_range_tombstone_iter) { for (auto& m : memlist_) { - auto mem_iter = m->NewIterator(options, merge_iter_builder->GetArena()); + auto mem_iter = m->NewIterator(options, seqno_to_time_mapping, + merge_iter_builder->GetArena()); if (!add_range_tombstone_iter || options.ignore_range_deletions) { merge_iter_builder->AddIterator(mem_iter); } else { diff --git a/db/memtable_list.h b/db/memtable_list.h index 771ce89b9c..218701e0b3 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -112,10 +112,12 @@ class MemTableListVersion { RangeDelAggregator* range_del_agg); void AddIterators(const ReadOptions& options, + UnownedPtr seqno_to_time_mapping, std::vector* iterator_list, Arena* arena); void AddIterators(const ReadOptions& options, + UnownedPtr seqno_to_time_mapping, MergeIteratorBuilder* merge_iter_builder, bool add_range_tombstone_iter); diff --git a/db/repair.cc b/db/repair.cc index 789454d361..eddafe1332 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -443,7 +443,8 @@ class Repairer { ReadOptions ro; ro.total_order_seek = true; Arena arena; - ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); + ScopedArenaIterator iter( + mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena)); int64_t _current_time = 0; immutable_db_options_.clock->GetCurrentTime(&_current_time) .PermitUncheckedError(); // ignore error diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 623adb76dd..3fe84927a2 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -59,7 +59,8 @@ static std::string PrintContents(WriteBatch* b, std::unique_ptr iter_guard; InternalIterator* iter; if (i == 0) { - iter = mem->NewIterator(ReadOptions(), &arena); + iter = mem->NewIterator(ReadOptions(), /*seqno_to_time_mapping=*/nullptr, + &arena); arena_iter_guard.set(iter); } else { iter = mem->NewRangeTombstoneIterator(ReadOptions(), diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 5c4ead0f53..0cddf4a334 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -137,15 +137,18 @@ class Iterator : public Cleanable { // Get the user-key portion of the internal key at which the iteration // stopped. // Property "rocksdb.iterator.write-time": - // DO NOT USE, UNDER CONSTRUCTION // Get the unix time of the best estimate of the write time of the entry. // Returned as 64-bit raw value (8 bytes). It can be converted to uint64_t // with util method `DecodeU64Ts`. The accuracy of the write time depends on - // settings like preserve_internal_time_seconds. If this feature is - // disabled, this property will always be empty. The actual write time of + // settings like preserve_internal_time_seconds. The actual write time of // the entry should be the same or newer than the returned write time. So // this property can be interpreted as the possible oldest write time for // the entry. + // If the seqno to time mapping recording is not enabled, + // std::numeric_limits::max() will be returned to indicate the + // write time is unknown. For data entry whose sequence number has + // been zeroed out (possible when they reach the last level), 0 is returned + // no matter whether the seqno to time recording feature is enabled or not. virtual Status GetProperty(std::string prop_name, std::string* prop); virtual Slice timestamp() const { diff --git a/java/rocksjni/write_batch_test.cc b/java/rocksjni/write_batch_test.cc index 30b9a72297..f3163374cb 100644 --- a/java/rocksjni/write_batch_test.cc +++ b/java/rocksjni/write_batch_test.cc @@ -60,7 +60,8 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents(JNIEnv* env, unsigned int count = 0; ROCKSDB_NAMESPACE::Arena arena; ROCKSDB_NAMESPACE::ScopedArenaIterator iter( - mem->NewIterator(ROCKSDB_NAMESPACE::ReadOptions(), &arena)); + mem->NewIterator(ROCKSDB_NAMESPACE::ReadOptions(), + /*seqno_to_time_mapping=*/nullptr, &arena)); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ROCKSDB_NAMESPACE::ParsedInternalKey ikey; ikey.clear(); diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h index ec7c330944..0ba0e3e289 100644 --- a/table/block_based/block_based_table_iterator.h +++ b/table/block_based/block_based_table_iterator.h @@ -9,6 +9,7 @@ #pragma once #include +#include "db/seqno_to_time_mapping.h" #include "table/block_based/block_based_table_reader.h" #include "table/block_based/block_based_table_reader_impl.h" #include "table/block_based/block_prefetcher.h" @@ -92,6 +93,22 @@ class BlockBasedTableIterator : public InternalIteratorBase { return const_cast(this) ->MaterializeCurrentBlock(); } + + uint64_t write_unix_time() const override { + assert(Valid()); + // TODO(yuzhangyu): if value type is kTypeValuePreferredSeqno, + // parse its unix write time out of packed value. + const SeqnoToTimeMapping& seqno_to_time_mapping = + table_->GetSeqnoToTimeMapping(); + SequenceNumber seqno = ExtractSequenceNumber(key()); + if (kUnknownSeqnoBeforeAll == seqno) { + return kUnknownTimeBeforeAll; + } else if (seqno_to_time_mapping.Empty()) { + return std::numeric_limits::max(); + } + return seqno_to_time_mapping.GetProximalTimeBeforeSeqno(seqno); + } + Slice value() const override { // PrepareValue() must have been called. assert(!is_at_first_key_from_index_); diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index c4afdb637f..27c34361c3 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -921,6 +921,17 @@ Status BlockBasedTable::ReadPropertiesBlock( } else { assert(table_properties != nullptr); rep_->table_properties = std::move(table_properties); + + if (s.ok()) { + s = rep_->seqno_to_time_mapping.DecodeFrom( + rep_->table_properties->seqno_to_time_mapping); + } + if (!s.ok()) { + ROCKS_LOG_WARN( + rep_->ioptions.logger, + "Problem reading or processing seqno-to-time mapping: %s", + s.ToString().c_str()); + } rep_->blocks_maybe_compressed = rep_->table_properties->compression_name != CompressionTypeToString(kNoCompression); @@ -1233,6 +1244,10 @@ std::shared_ptr BlockBasedTable::GetTableProperties() return rep_->table_properties; } +const SeqnoToTimeMapping& BlockBasedTable::GetSeqnoToTimeMapping() const { + return rep_->seqno_to_time_mapping; +} + size_t BlockBasedTable::ApproximateMemoryUsage() const { size_t usage = 0; if (rep_) { diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index 262b53a7ae..a98d7c78be 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -16,6 +16,7 @@ #include "cache/cache_key.h" #include "cache/cache_reservation_manager.h" #include "db/range_tombstone_fragmenter.h" +#include "db/seqno_to_time_mapping.h" #include "file/filename.h" #include "rocksdb/slice_transform.h" #include "rocksdb/table_properties.h" @@ -197,6 +198,8 @@ class BlockBasedTable : public TableReader { std::shared_ptr GetTableProperties() const override; + const SeqnoToTimeMapping& GetSeqnoToTimeMapping() const; + size_t ApproximateMemoryUsage() const override; // convert SST file to a human readable form @@ -607,6 +610,7 @@ struct BlockBasedTable::Rep { BlockHandle compression_dict_handle; std::shared_ptr table_properties; + SeqnoToTimeMapping seqno_to_time_mapping; BlockHandle index_handle; BlockBasedTableOptions::IndexType index_type; bool whole_key_filtering; diff --git a/table/internal_iterator.h b/table/internal_iterator.h index 2cb4f1098a..8ecbb0f90b 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -116,6 +116,14 @@ class InternalIteratorBase : public Cleanable { // REQUIRES: Valid() virtual Slice key() const = 0; + // Returns the approximate write time of this entry, which is deduced from + // sequence number if sequence number to time mapping is available. + // The default implementation returns maximum uint64_t and that indicates the + // write time is unknown. + virtual uint64_t write_unix_time() const { + return std::numeric_limits::max(); + } + // Return user key for the current entry. // REQUIRES: Valid() virtual Slice user_key() const { return ExtractUserKey(key()); } diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 41da2a66ce..b53076910e 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -82,6 +82,12 @@ class IteratorWrapperBase { assert(Valid()); return result_.key; } + + uint64_t write_unix_time() const { + assert(Valid()); + return iter_->write_unix_time(); + } + TValue value() const { assert(Valid()); return iter_->value(); diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 247564fe7b..833c6123ee 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -430,6 +430,11 @@ class MergingIterator : public InternalIterator { return current_->key(); } + uint64_t write_unix_time() const override { + assert(Valid()); + return current_->write_unix_time(); + } + Slice value() const override { assert(Valid()); return current_->value(); diff --git a/table/table_test.cc b/table/table_test.cc index 9526bd9f01..432799468b 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -534,7 +534,9 @@ class MemTableConstructor : public Constructor { InternalIterator* NewIterator( const SliceTransform* /*prefix_extractor*/) const override { return new KeyConvertingIterator( - memtable_->NewIterator(ReadOptions(), &arena_), true); + memtable_->NewIterator(ReadOptions(), /*seqno_to_time_mapping=*/nullptr, + &arena_), + true); } bool AnywayDeleteIterator() const override { return true; } @@ -4897,7 +4899,8 @@ TEST_F(MemTableTest, Simple) { std::unique_ptr iter_guard; InternalIterator* iter; if (i == 0) { - iter = GetMemTable()->NewIterator(ReadOptions(), &arena); + iter = GetMemTable()->NewIterator( + ReadOptions(), /*seqno_to_time_mapping=*/nullptr, &arena); arena_iter_guard.set(iter); } else { iter = GetMemTable()->NewRangeTombstoneIterator(