diff --git a/CMakeLists.txt b/CMakeLists.txt index f2bf831a3c..5d4dc5004d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -670,6 +670,7 @@ set(SOURCES db/range_del_aggregator.cc db/range_tombstone_fragmenter.cc db/repair.cc + db/seqno_to_time_mapping.cc db/snapshot_impl.cc db/table_cache.cc db/table_properties_collector.cc @@ -1296,6 +1297,7 @@ if(WITH_TESTS) db/perf_context_test.cc db/periodic_work_scheduler_test.cc db/plain_table_db_test.cc + db/seqno_time_test.cc db/prefix_test.cc db/range_del_aggregator_test.cc db/range_tombstone_fragmenter_test.cc diff --git a/HISTORY.md b/HISTORY.md index bc3dfcfbf9..36ffff24b6 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * User can configure the new ColumnFamilyOptions `blob_cache` to enable/disable blob caching. * Either sharing the backend cache with the block cache or using a completely separate cache is supported. * A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`). +* Add experimental tiered compaction feature `AdvancedColumnFamilyOptions::preclude_last_level_data_seconds`, which makes sure the new data inserted within preclude_last_level_data_seconds won't be placed on cold tier (the feature is not complete). ### Public API changes * Add metadata related structs and functions in C API, including diff --git a/Makefile b/Makefile index a79ab71438..4a39fe09a3 100644 --- a/Makefile +++ b/Makefile @@ -1504,6 +1504,9 @@ db_table_properties_test: $(OBJ_DIR)/db/db_table_properties_test.o $(TEST_LIBRAR log_write_bench: $(OBJ_DIR)/util/log_write_bench.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) $(PROFILING_FLAGS) +seqno_time_test: $(OBJ_DIR)/db/seqno_time_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + plain_table_db_test: $(OBJ_DIR)/db/plain_table_db_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 48904020ce..b53c1860a2 100644 --- a/TARGETS +++ b/TARGETS @@ -85,6 +85,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/range_del_aggregator.cc", "db/range_tombstone_fragmenter.cc", "db/repair.cc", + "db/seqno_to_time_mapping.cc", "db/snapshot_impl.cc", "db/table_cache.cc", "db/table_properties_collector.cc", @@ -419,6 +420,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "db/range_del_aggregator.cc", "db/range_tombstone_fragmenter.cc", "db/repair.cc", + "db/seqno_to_time_mapping.cc", "db/snapshot_impl.cc", "db/table_cache.cc", "db/table_properties_collector.cc", @@ -5688,6 +5690,12 @@ cpp_unittest_wrapper(name="ribbon_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="seqno_time_test", + srcs=["db/seqno_time_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="sim_cache_test", srcs=["utilities/simulator_cache/sim_cache_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/builder.cc b/db/builder.cc index f0c24de6cd..7d8244af90 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -66,7 +66,8 @@ Status BuildTable( SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker, bool paranoid_file_checks, InternalStats* internal_stats, IOStatus* io_status, const std::shared_ptr& io_tracer, - BlobFileCreationReason blob_creation_reason, EventLogger* event_logger, + BlobFileCreationReason blob_creation_reason, + const SeqnoToTimeMapping& seqno_to_time_mapping, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, TableProperties* table_properties, Env::WriteLifeTimeHint write_hint, const std::string* full_history_ts_low, @@ -260,6 +261,15 @@ Status BuildTable( if (!s.ok() || empty) { builder->Abandon(); } else { + std::string seqno_time_mapping_str; + seqno_to_time_mapping.Encode( + seqno_time_mapping_str, meta->fd.smallest_seqno, + meta->fd.largest_seqno, meta->file_creation_time); + builder->SetSeqnoTimeTableProperties( + seqno_time_mapping_str, + ioptions.compaction_style == CompactionStyle::kCompactionStyleFIFO + ? meta->file_creation_time + : meta->oldest_ancester_time); s = builder->Finish(); } if (io_status->ok()) { diff --git a/db/builder.h b/db/builder.h index 73acf59248..a028fd2ba3 100644 --- a/db/builder.h +++ b/db/builder.h @@ -9,7 +9,9 @@ #include #include #include + #include "db/range_tombstone_fragmenter.h" +#include "db/seqno_to_time_mapping.h" #include "db/table_properties_collector.h" #include "logging/event_logger.h" #include "options/cf_options.h" @@ -61,6 +63,7 @@ extern Status BuildTable( bool paranoid_file_checks, InternalStats* internal_stats, IOStatus* io_status, const std::shared_ptr& io_tracer, BlobFileCreationReason blob_creation_reason, + const SeqnoToTimeMapping& seqno_to_time_mapping, EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr, diff --git a/db/column_family.h b/db/column_family.h index a1d9060b36..91a8253742 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -524,6 +524,8 @@ class ColumnFamilyData { return file_metadata_cache_res_mgr_; } + SequenceNumber GetFirstMemtableSequenceNumber() const; + static const uint32_t kDummyColumnFamilyDataId; // Keep track of whether the mempurge feature was ever used. diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 54c58887c5..620ae53f2b 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -33,7 +33,8 @@ CompactionIterator::CompactionIterator( const Compaction* compaction, const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const std::shared_ptr info_log, - const std::string* full_history_ts_low) + const std::string* full_history_ts_low, + const SequenceNumber max_seqno_allow_zero_out) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, @@ -42,7 +43,8 @@ CompactionIterator::CompactionIterator( manual_compaction_canceled, std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), - compaction_filter, shutting_down, info_log, full_history_ts_low) {} + compaction_filter, shutting_down, info_log, full_history_ts_low, + max_seqno_allow_zero_out) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -58,7 +60,8 @@ CompactionIterator::CompactionIterator( const CompactionFilter* compaction_filter, const std::atomic* shutting_down, const std::shared_ptr info_log, - const std::string* full_history_ts_low) + const std::string* full_history_ts_low, + const SequenceNumber max_seqno_allow_zero_out) : input_(input, cmp, !compaction || compaction->DoesInputReferenceBlobFiles()), cmp_(cmp), @@ -92,7 +95,8 @@ CompactionIterator::CompactionIterator( CreatePrefetchBufferCollectionIfNeeded(compaction_.get())), current_key_committed_(false), cmp_with_history_ts_low_(0), - level_(compaction_ == nullptr ? 0 : compaction_->level()) { + level_(compaction_ == nullptr ? 0 : compaction_->level()), + max_seqno_allow_zero_out_(max_seqno_allow_zero_out) { assert(snapshots_ != nullptr); bottommost_level_ = compaction_ == nullptr ? false @@ -1148,7 +1152,8 @@ void CompactionIterator::PrepareOutput() { !compaction_->allow_ingest_behind() && bottommost_level_ && DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && ikey_.type != kTypeMerge && current_key_committed_ && - !output_to_penultimate_level_) { + !output_to_penultimate_level_ && + ikey_.sequence < max_seqno_allow_zero_out_) { if (ikey_.type == kTypeDeletion || (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { ROCKS_LOG_FATAL( diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index 9c4dbab600..0f2d9882d6 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -181,42 +181,40 @@ class CompactionIterator { const Compaction* compaction_; }; - CompactionIterator(InternalIterator* input, const Comparator* cmp, - MergeHelper* merge_helper, SequenceNumber last_sequence, - std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - SequenceNumber job_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, - bool allow_data_in_errors, - bool enforce_single_del_contracts, - const std::atomic& manual_compaction_canceled, - const Compaction* compaction = nullptr, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const std::shared_ptr info_log = nullptr, - const std::string* full_history_ts_low = nullptr); + CompactionIterator( + InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, + Env* env, bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, + bool enforce_single_del_contracts, + const std::atomic& manual_compaction_canceled, + const Compaction* compaction = nullptr, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr, + const SequenceNumber max_seqno_allow_zero_out = kMaxSequenceNumber); // Constructor with custom CompactionProxy, used for tests. - CompactionIterator(InternalIterator* input, const Comparator* cmp, - MergeHelper* merge_helper, SequenceNumber last_sequence, - std::vector* snapshots, - SequenceNumber earliest_write_conflict_snapshot, - SequenceNumber job_snapshot, - const SnapshotChecker* snapshot_checker, Env* env, - bool report_detailed_time, bool expect_valid_internal_key, - CompactionRangeDelAggregator* range_del_agg, - BlobFileBuilder* blob_file_builder, - bool allow_data_in_errors, - bool enforce_single_del_contracts, - const std::atomic& manual_compaction_canceled, - std::unique_ptr compaction, - const CompactionFilter* compaction_filter = nullptr, - const std::atomic* shutting_down = nullptr, - const std::shared_ptr info_log = nullptr, - const std::string* full_history_ts_low = nullptr); + CompactionIterator( + InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, + SequenceNumber last_sequence, std::vector* snapshots, + SequenceNumber earliest_write_conflict_snapshot, + SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, + Env* env, bool report_detailed_time, bool expect_valid_internal_key, + CompactionRangeDelAggregator* range_del_agg, + BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, + bool enforce_single_del_contracts, + const std::atomic& manual_compaction_canceled, + std::unique_ptr compaction, + const CompactionFilter* compaction_filter = nullptr, + const std::atomic* shutting_down = nullptr, + const std::shared_ptr info_log = nullptr, + const std::string* full_history_ts_low = nullptr, + const SequenceNumber max_seqno_allow_zero_out = kMaxSequenceNumber); ~CompactionIterator(); @@ -446,6 +444,11 @@ class CompactionIterator { // output to. bool output_to_penultimate_level_{false}; + // any key later than this sequence number, need to keep the sequence number + // and not zeroed out. The sequence number is kept to track it's approximate + // time. + const SequenceNumber max_seqno_allow_zero_out_ = kMaxSequenceNumber; + void AdvanceInputIter() { input_.Next(); } void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index d609ac0b71..ad9413b858 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -223,12 +223,12 @@ void CompactionJob::Prepare() { // Generate file_levels_ for compaction before making Iterator auto* c = compact_->compaction; - assert(c->column_family_data() != nullptr); - assert(c->column_family_data()->current()->storage_info()->NumLevelFiles( + ColumnFamilyData* cfd = c->column_family_data(); + assert(cfd != nullptr); + assert(cfd->current()->storage_info()->NumLevelFiles( compact_->compaction->level()) > 0); - write_hint_ = - c->column_family_data()->CalculateSSTWriteHint(c->output_level()); + write_hint_ = cfd->CalculateSSTWriteHint(c->output_level()); bottommost_level_ = c->bottommost_level(); if (c->ShouldFormSubcompactions()) { @@ -251,6 +251,43 @@ void CompactionJob::Prepare() { compact_->sub_compact_states.emplace_back(c, start, end, /*sub_job_id*/ 0); } + + if (c->immutable_options()->preclude_last_level_data_seconds > 0) { + // TODO(zjay): move to a function + seqno_time_mapping_.SetMaxTimeDuration( + c->immutable_options()->preclude_last_level_data_seconds); + // setup seqno_time_mapping_ + for (const auto& each_level : *c->inputs()) { + for (const auto& fmd : each_level.files) { + std::shared_ptr tp; + Status s = cfd->current()->GetTableProperties(&tp, fmd, nullptr); + if (s.ok()) { + seqno_time_mapping_.Add(tp->seqno_to_time_mapping) + .PermitUncheckedError(); + seqno_time_mapping_.Add(fmd->fd.smallest_seqno, + fmd->oldest_ancester_time); + } + } + } + + auto status = seqno_time_mapping_.Sort(); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Invalid sequence number to time mapping: Status: %s", + status.ToString().c_str()); + } + int64_t _current_time = 0; + status = db_options_.clock->GetCurrentTime(&_current_time); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to get current time in compaction: Status: %s", + status.ToString().c_str()); + max_seqno_allow_zero_out_ = 0; + } else { + max_seqno_allow_zero_out_ = + seqno_time_mapping_.TruncateOldEntries(_current_time); + } + } } struct RangeWithSize { @@ -989,7 +1026,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { blob_file_builder.get(), db_options_.allow_data_in_errors, db_options_.enforce_single_del_contracts, manual_compaction_canceled_, sub_compact->compaction, compaction_filter, shutting_down_, - db_options_.info_log, full_history_ts_low); + db_options_.info_log, full_history_ts_low, max_seqno_allow_zero_out_); c_iter->SeekToFirst(); // Assign range delete aggregator to the target output level, which makes sure @@ -1253,7 +1290,7 @@ Status CompactionJob::FinishCompactionOutputFile( const uint64_t current_entries = outputs.NumEntries(); - s = outputs.Finish(s); + s = outputs.Finish(s, seqno_time_mapping_); if (s.ok()) { // With accurate smallest and largest key, we can get a slightly more @@ -1617,9 +1654,8 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact, sub_compact->compaction->output_compression_opts(), cfd->GetID(), cfd->GetName(), sub_compact->compaction->output_level(), bottommost_level_, TableFileCreationReason::kCompaction, - oldest_ancester_time, 0 /* oldest_key_time */, current_time, db_id_, - db_session_id_, sub_compact->compaction->max_output_file_size(), - file_number); + 0 /* oldest_key_time */, current_time, db_id_, db_session_id_, + sub_compact->compaction->max_output_file_size(), file_number); outputs.NewBuilder(tboptions); diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 4c3ab42068..2dc2101985 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -27,6 +27,7 @@ #include "db/log_writer.h" #include "db/memtable_list.h" #include "db/range_del_aggregator.h" +#include "db/seqno_to_time_mapping.h" #include "db/version_edit.h" #include "db/write_controller.h" #include "db/write_thread.h" @@ -299,6 +300,14 @@ class CompactionJob { uint64_t GetCompactionId(SubcompactionState* sub_compact) const; + // Stores the sequence number to time mapping gathered from all input files + // it also collects the smallest_seqno -> oldest_ancester_time from the SST. + SeqnoToTimeMapping seqno_time_mapping_; + + // If a sequence number larger than max_seqno_allow_zero_out_, it won't be + // zeroed out. The sequence number is kept to get approximate time of the key. + SequenceNumber max_seqno_allow_zero_out_ = kMaxSequenceNumber; + // Get table file name in where it's outputting to, which should also be in // `output_directory_`. virtual std::string GetTableFileName(uint64_t file_number); diff --git a/db/compaction/compaction_outputs.cc b/db/compaction/compaction_outputs.cc index 346df5aec4..849a583fb8 100644 --- a/db/compaction/compaction_outputs.cc +++ b/db/compaction/compaction_outputs.cc @@ -18,12 +18,19 @@ void CompactionOutputs::NewBuilder(const TableBuilderOptions& tboptions) { builder_.reset(NewTableBuilder(tboptions, file_writer_.get())); } -Status CompactionOutputs::Finish(const Status& intput_status) { +Status CompactionOutputs::Finish(const Status& intput_status, + const SeqnoToTimeMapping& seqno_time_mapping) { FileMetaData* meta = GetMetaData(); assert(meta != nullptr); Status s = intput_status; if (s.ok()) { + std::string seqno_time_mapping_str; + seqno_time_mapping.Encode(seqno_time_mapping_str, meta->fd.smallest_seqno, + meta->fd.largest_seqno, meta->file_creation_time); + builder_->SetSeqnoTimeTableProperties(seqno_time_mapping_str, + meta->oldest_ancester_time); s = builder_->Finish(); + } else { builder_->Abandon(); } diff --git a/db/compaction/compaction_outputs.h b/db/compaction/compaction_outputs.h index bfacc4b45f..6359249898 100644 --- a/db/compaction/compaction_outputs.h +++ b/db/compaction/compaction_outputs.h @@ -111,7 +111,8 @@ class CompactionOutputs { } // Finish the current output file - Status Finish(const Status& intput_status); + Status Finish(const Status& intput_status, + const SeqnoToTimeMapping& seqno_time_mapping); // Update output table properties from table builder void UpdateTableProperties() { diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 4f58b77030..8a5216d8d6 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -480,6 +480,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { #ifndef ROCKSDB_LITE if (periodic_work_scheduler_ != nullptr) { periodic_work_scheduler_->Unregister(this); + periodic_work_scheduler_->UnregisterRecordSeqnoTimeWorker(this); } #endif // !ROCKSDB_LITE @@ -791,6 +792,53 @@ Status DBImpl::StartPeriodicWorkScheduler() { #endif // !ROCKSDB_LITE } +Status DBImpl::RegisterRecordSeqnoTimeWorker() { +#ifndef ROCKSDB_LITE + if (!periodic_work_scheduler_) { + return Status::OK(); + } + uint64_t min_time_duration = std::numeric_limits::max(); + uint64_t max_time_duration = std::numeric_limits::min(); + { + InstrumentedMutexLock l(&mutex_); + + for (auto cfd : *versions_->GetColumnFamilySet()) { + uint64_t preclude_last_option = + cfd->ioptions()->preclude_last_level_data_seconds; + if (!cfd->IsDropped() && preclude_last_option > 0) { + min_time_duration = std::min(preclude_last_option, min_time_duration); + max_time_duration = std::max(preclude_last_option, max_time_duration); + } + } + if (min_time_duration == std::numeric_limits::max()) { + seqno_time_mapping_.Resize(0, 0); + } else { + seqno_time_mapping_.Resize(min_time_duration, max_time_duration); + } + } + + uint64_t seqno_time_cadence = 0; + if (min_time_duration != std::numeric_limits::max()) { + seqno_time_cadence = + min_time_duration / SeqnoToTimeMapping::kMaxSeqnoTimePairsPerCF; + } + + Status s = periodic_work_scheduler_->RegisterRecordSeqnoTimeWorker( + this, seqno_time_cadence); + if (s.IsNotSupported()) { + // TODO: Fix the timer cannot cancel and re-add the same task + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Updating seqno to time worker cadence is not supported yet, to make " + "the change effective, please reopen the DB instance."); + s = Status::OK(); + } + return s; +#else + return Status::OK(); +#endif // !ROCKSDB_LITE +} + // esitmate the total size of stats_history_ size_t DBImpl::EstimateInMemoryStatsHistorySize() const { size_t size_total = @@ -2805,6 +2853,14 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, } } // InstrumentedMutexLock l(&mutex_) + if (cf_options.preclude_last_level_data_seconds > 0) { + // TODO(zjay): Fix the timer issue and re-enable this. + ROCKS_LOG_ERROR( + immutable_db_options_.info_log, + "Creating column family with `preclude_last_level_data_seconds` needs " + "to restart DB to take effect"); + // s = RegisterRecordSeqnoTimeWorker(); + } sv_context.Clean(); // this is outside the mutex if (s.ok()) { @@ -2893,6 +2949,10 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) { bg_cv_.SignalAll(); } + if (cfd->ioptions()->preclude_last_level_data_seconds > 0) { + s = RegisterRecordSeqnoTimeWorker(); + } + if (s.ok()) { // Note that here we erase the associated cf_info of the to-be-dropped // cfd before its ref-count goes to zero to avoid having to erase cf_info @@ -5536,6 +5596,26 @@ Status DBImpl::GetCreationTimeOfOldestFile(uint64_t* creation_time) { return Status::NotSupported("This API only works if max_open_files = -1"); } } + +void DBImpl::RecordSeqnoToTimeMapping() { + // Get time first then sequence number, so the actual time of seqno is <= + // unix_time recorded + int64_t unix_time = 0; + immutable_db_options_.clock->GetCurrentTime(&unix_time) + .PermitUncheckedError(); // Ignore error + SequenceNumber seqno = GetLatestSequenceNumber(); + bool appended = false; + { + InstrumentedMutexLock l(&mutex_); + appended = seqno_time_mapping_.Append(seqno, unix_time); + } + if (!appended) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "Failed to insert sequence number to time entry: %" PRIu64 + " -> %" PRIu64, + seqno, unix_time); + } +} #endif // ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 8fc0640099..711ebfe040 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -36,6 +36,7 @@ #include "db/pre_release_callback.h" #include "db/range_del_aggregator.h" #include "db/read_callback.h" +#include "db/seqno_to_time_mapping.h" #include "db/snapshot_checker.h" #include "db/snapshot_impl.h" #include "db/trim_history_scheduler.h" @@ -1158,7 +1159,8 @@ class DBImpl : public DB { int TEST_BGCompactionsAllowed() const; int TEST_BGFlushesAllowed() const; size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; - void TEST_WaitForStatsDumpRun(std::function callback) const; + void TEST_WaitForPeridicWorkerRun(std::function callback) const; + const SeqnoToTimeMapping& TEST_GetSeqnoToTimeMapping() const; size_t TEST_EstimateInMemoryStatsHistorySize() const; uint64_t TEST_GetCurrentLogNumber() const { @@ -1186,6 +1188,9 @@ class DBImpl : public DB { // flush LOG out of application buffer void FlushInfoLog(); + // record current sequence number to time mapping + void RecordSeqnoToTimeMapping(); + // Interface to block and signal the DB in case of stalling writes by // WriteBufferManager. Each DBImpl object contains ptr to WBMStallInterface. // When DB needs to be blocked or signalled by WriteBufferManager, @@ -2069,6 +2074,8 @@ class DBImpl : public DB { // Schedule background tasks Status StartPeriodicWorkScheduler(); + Status RegisterRecordSeqnoTimeWorker(); + void PrintStatistics(); size_t EstimateInMemoryStatsHistorySize() const; @@ -2586,6 +2593,8 @@ class DBImpl : public DB { // Pointer to WriteBufferManager stalling interface. std::unique_ptr wbm_stall_; + + SeqnoToTimeMapping seqno_time_mapping_; }; class GetWithTimestampReadCallback : public ReadCallback { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 0db65301ba..136d394cc6 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -212,8 +212,8 @@ Status DBImpl::FlushMemTableToOutputFile( GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, - io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow(), - &blob_callback_); + io_tracer_, seqno_time_mapping_, db_id_, db_session_id_, + cfd->GetFullHistoryTsLow(), &blob_callback_); FileMetaData file_meta; Status s; @@ -450,7 +450,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, false /* sync_output_directory */, false /* write_manifest */, - thread_pri, io_tracer_, db_id_, db_session_id_, + thread_pri, io_tracer_, seqno_time_mapping_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow(), &blob_callback_)); } diff --git a/db/db_impl/db_impl_debug.cc b/db/db_impl/db_impl_debug.cc index 5647aa26f2..b70959c353 100644 --- a/db/db_impl/db_impl_debug.cc +++ b/db/db_impl/db_impl_debug.cc @@ -302,7 +302,8 @@ size_t DBImpl::TEST_GetWalPreallocateBlockSize( } #ifndef ROCKSDB_LITE -void DBImpl::TEST_WaitForStatsDumpRun(std::function callback) const { +void DBImpl::TEST_WaitForPeridicWorkerRun( + std::function callback) const { if (periodic_work_scheduler_ != nullptr) { static_cast(periodic_work_scheduler_) ->TEST_WaitForRun(callback); @@ -312,6 +313,11 @@ void DBImpl::TEST_WaitForStatsDumpRun(std::function callback) const { PeriodicWorkTestScheduler* DBImpl::TEST_GetPeriodicWorkScheduler() const { return static_cast(periodic_work_scheduler_); } + +const SeqnoToTimeMapping& DBImpl::TEST_GetSeqnoToTimeMapping() const { + return seqno_time_mapping_; +} + #endif // !ROCKSDB_LITE size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index b8b78a8c82..c854c3870b 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1554,17 +1554,19 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(), 0 /* level */, false /* is_bottommost */, - TableFileCreationReason::kRecovery, current_time, - 0 /* oldest_key_time */, 0 /* file_creation_time */, db_id_, - db_session_id_, 0 /* target_file_size */, meta.fd.GetNumber()); + TableFileCreationReason::kRecovery, 0 /* oldest_key_time */, + 0 /* file_creation_time */, db_id_, db_session_id_, + 0 /* target_file_size */, meta.fd.GetNumber()); + SeqnoToTimeMapping empty_seqno_time_mapping; s = BuildTable( dbname_, versions_.get(), immutable_db_options_, tboptions, file_options_for_compaction_, cfd->table_cache(), iter.get(), std::move(range_del_iters), &meta, &blob_file_additions, snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber, snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s, - io_tracer_, BlobFileCreationReason::kRecovery, &event_logger_, job_id, - Env::IO_HIGH, nullptr /* table_properties */, write_hint, + io_tracer_, BlobFileCreationReason::kRecovery, + empty_seqno_time_mapping, &event_logger_, job_id, Env::IO_HIGH, + nullptr /* table_properties */, write_hint, nullptr /*full_history_ts_low*/, &blob_callback_); LogFlush(immutable_db_options_.info_log); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, @@ -2106,6 +2108,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { s = impl->StartPeriodicWorkScheduler(); } + + if (s.ok()) { + s = impl->RegisterRecordSeqnoTimeWorker(); + } if (!s.ok()) { for (auto* h : *handles) { delete h; diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 3ec0e8da1b..8883183c33 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -148,7 +148,8 @@ void EventHelpers::LogAndNotifyTableFileCreationFinished( << table_properties.fast_compression_estimated_data_size << "db_id" << table_properties.db_id << "db_session_id" << table_properties.db_session_id << "orig_file_number" - << table_properties.orig_file_number; + << table_properties.orig_file_number << "seqno_to_time_mapping" + << table_properties.seqno_to_time_mapping; // user collected properties for (const auto& prop : table_properties.readable_properties) { diff --git a/db/flush_job.cc b/db/flush_job.cc index 57d3a510fa..f63c090083 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -95,8 +95,9 @@ FlushJob::FlushJob( Statistics* stats, EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, Env::Priority thread_pri, const std::shared_ptr& io_tracer, - const std::string& db_id, const std::string& db_session_id, - std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback) + const SeqnoToTimeMapping& seqno_time_mapping, const std::string& db_id, + const std::string& db_session_id, std::string full_history_ts_low, + BlobFileCompletionCallback* blob_callback) : dbname_(dbname), db_id_(db_id), db_session_id_(db_session_id), @@ -128,7 +129,8 @@ FlushJob::FlushJob( io_tracer_(io_tracer), clock_(db_options_.clock), full_history_ts_low_(std::move(full_history_ts_low)), - blob_callback_(blob_callback) { + blob_callback_(blob_callback), + db_impl_seqno_time_mapping_(seqno_time_mapping) { // Update the thread status to indicate flush. ReportStartedFlush(); TEST_SYNC_POINT("FlushJob::FlushJob()"); @@ -814,6 +816,11 @@ Status FlushJob::WriteLevel0Table() { const uint64_t start_cpu_micros = clock_->CPUMicros(); Status s; + SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber(); + if (!db_impl_seqno_time_mapping_.Empty()) { + seqno_to_time_mapping_ = db_impl_seqno_time_mapping_.Copy(smallest_seqno); + } + std::vector blob_file_additions; { @@ -902,18 +909,13 @@ Status FlushJob::WriteLevel0Table() { "FlushJob::WriteLevel0Table:oldest_ancester_time", &oldest_ancester_time); meta_.oldest_ancester_time = oldest_ancester_time; - meta_.file_creation_time = current_time; - uint64_t creation_time = (cfd_->ioptions()->compaction_style == - CompactionStyle::kCompactionStyleFIFO) - ? current_time - : meta_.oldest_ancester_time; - uint64_t num_input_entries = 0; uint64_t memtable_payload_bytes = 0; uint64_t memtable_garbage_bytes = 0; IOStatus io_s; + const std::string* const full_history_ts_low = (full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_; TableBuilderOptions tboptions( @@ -921,8 +923,8 @@ Status FlushJob::WriteLevel0Table() { cfd_->int_tbl_prop_collector_factories(), output_compression_, mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(), 0 /* level */, false /* is_bottommost */, - TableFileCreationReason::kFlush, creation_time, oldest_key_time, - current_time, db_id_, db_session_id_, 0 /* target_file_size */, + TableFileCreationReason::kFlush, oldest_key_time, current_time, + db_id_, db_session_id_, 0 /* target_file_size */, meta_.fd.GetNumber()); const SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence(); @@ -933,10 +935,10 @@ Status FlushJob::WriteLevel0Table() { earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), &io_s, io_tracer_, - BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id, - io_priority, &table_properties_, write_hint, full_history_ts_low, - blob_callback_, &num_input_entries, &memtable_payload_bytes, - &memtable_garbage_bytes); + BlobFileCreationReason::kFlush, seqno_to_time_mapping_, event_logger_, + job_context_->job_id, io_priority, &table_properties_, write_hint, + full_history_ts_low, blob_callback_, &num_input_entries, + &memtable_payload_bytes, &memtable_garbage_bytes); // TODO: Cleanup io_status in BuildTable and table builders assert(!s.ok() || io_s.ok()); io_s.PermitUncheckedError(); diff --git a/db/flush_job.h b/db/flush_job.h index 33c51e6459..60c272aec3 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -25,6 +25,7 @@ #include "db/log_writer.h" #include "db/logs_with_prep_tracker.h" #include "db/memtable_list.h" +#include "db/seqno_to_time_mapping.h" #include "db/snapshot_impl.h" #include "db/version_edit.h" #include "db/write_controller.h" @@ -72,6 +73,7 @@ class FlushJob { EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, Env::Priority thread_pri, const std::shared_ptr& io_tracer, + const SeqnoToTimeMapping& seq_time_mapping, const std::string& db_id = "", const std::string& db_session_id = "", std::string full_history_ts_low = "", BlobFileCompletionCallback* blob_callback = nullptr); @@ -191,6 +193,11 @@ class FlushJob { const std::string full_history_ts_low_; BlobFileCompletionCallback* blob_callback_; + + // reference to the seqno_time_mapping_ in db_impl.h, not safe to read without + // db mutex + const SeqnoToTimeMapping& db_impl_seqno_time_mapping_; + SeqnoToTimeMapping seqno_to_time_mapping_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index c9824f490b..3e67f09fe8 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -148,6 +148,8 @@ class FlushJobTestBase : public testing::Test { InstrumentedMutex mutex_; std::atomic shutting_down_; std::shared_ptr mock_table_factory_; + + SeqnoToTimeMapping empty_seqno_to_time_mapping_; }; class FlushJobTest : public FlushJobTestBase { @@ -162,14 +164,15 @@ TEST_F(FlushJobTest, Empty) { auto cfd = versions_->GetColumnFamilySet()->GetDefault(); EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relavant - FlushJob flush_job( - dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, - *cfd->GetLatestMutableCFOptions(), - std::numeric_limits::max() /* memtable_id */, env_options_, - versions_.get(), &mutex_, &shutting_down_, {}, kMaxSequenceNumber, - snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, - nullptr, &event_logger, false, true /* sync_output_directory */, - true /* write_manifest */, Env::Priority::USER, nullptr /*IOTracer*/); + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + std::numeric_limits::max() /* memtable_id */, + env_options_, versions_.get(), &mutex_, &shutting_down_, + {}, kMaxSequenceNumber, snapshot_checker, &job_context, + nullptr, nullptr, nullptr, kNoCompression, nullptr, + &event_logger, false, true /* sync_output_directory */, + true /* write_manifest */, Env::Priority::USER, + nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); { InstrumentedMutexLock l(&mutex_); flush_job.PickMemTable(); @@ -254,7 +257,7 @@ TEST_F(FlushJobTest, NonEmpty) { snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, - Env::Priority::USER, nullptr /*IOTracer*/); + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); HistogramData hist; FileMetaData file_meta; @@ -316,7 +319,7 @@ TEST_F(FlushJobTest, FlushMemTablesSingleColumnFamily) { snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, - Env::Priority::USER, nullptr /*IOTracer*/); + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); HistogramData hist; FileMetaData file_meta; mutex_.Lock(); @@ -389,7 +392,8 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, false /* sync_output_directory */, false /* write_manifest */, - Env::Priority::USER, nullptr /*IOTracer*/)); + Env::Priority::USER, nullptr /*IOTracer*/, + empty_seqno_to_time_mapping_)); k++; } HistogramData hist; @@ -516,7 +520,7 @@ TEST_F(FlushJobTest, Snapshots) { snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, - Env::Priority::USER, nullptr /*IOTracer*/); + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); mutex_.Lock(); flush_job.PickMemTable(); ASSERT_OK(flush_job.Run()); @@ -571,7 +575,7 @@ TEST_F(FlushJobTest, GetRateLimiterPriorityForWrite) { snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, - Env::Priority::USER, nullptr /*IOTracer*/); + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_); // When the state from WriteController is normal. ASSERT_EQ(flush_job.GetRateLimiterPriorityForWrite(), Env::IO_HIGH); @@ -650,7 +654,8 @@ TEST_F(FlushJobTimestampTest, AllKeysExpired) { snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, - Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"", + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_, + /*db_id=*/"", /*db_session_id=*/"", full_history_ts_low); FileMetaData fmeta; @@ -701,7 +706,8 @@ TEST_F(FlushJobTimestampTest, NoKeyExpired) { snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, true /* sync_output_directory */, true /* write_manifest */, - Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"", + Env::Priority::USER, nullptr /*IOTracer*/, empty_seqno_to_time_mapping_, + /*db_id=*/"", /*db_session_id=*/"", full_history_ts_low); FileMetaData fmeta; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index f447ee7353..72f309fedf 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -255,6 +255,15 @@ SequenceNumber MemTableListVersion::GetEarliestSequenceNumber( } } +SequenceNumber MemTableListVersion::GetFirstSequenceNumber() const { + SequenceNumber min_first_seqno = kMaxSequenceNumber; + // The first memtable in the list might not be the oldest one with mempurge + for (const auto& m : memlist_) { + min_first_seqno = std::min(m->GetFirstSequenceNumber(), min_first_seqno); + } + return min_first_seqno; +} + // caller is responsible for referencing m void MemTableListVersion::Add(MemTable* m, autovector* to_delete) { assert(refs_ == 1); // only when refs_ == 1 is MemTableListVersion mutable diff --git a/db/memtable_list.h b/db/memtable_list.h index 866ecccb6f..0f75d3055b 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -129,6 +129,11 @@ class MemTableListVersion { // History. SequenceNumber GetEarliestSequenceNumber(bool include_history = false) const; + // Return the first sequence number from the memtable list, which is the + // smallest sequence number of all FirstSequenceNumber. + // Return kMaxSequenceNumber if the list is empty. + SequenceNumber GetFirstSequenceNumber() const; + private: friend class MemTableList; diff --git a/db/periodic_work_scheduler.cc b/db/periodic_work_scheduler.cc index 904b847f15..c2742cb49e 100644 --- a/db/periodic_work_scheduler.cc +++ b/db/periodic_work_scheduler.cc @@ -11,6 +11,11 @@ #ifndef ROCKSDB_LITE namespace ROCKSDB_NAMESPACE { +const std::string PeriodicWorkTaskNames::kDumpStats = "dump_st"; +const std::string PeriodicWorkTaskNames::kPersistStats = "pst_st"; +const std::string PeriodicWorkTaskNames::kFlushInfoLog = "flush_info_log"; +const std::string PeriodicWorkTaskNames::kRecordSeqnoTime = "record_seq_time"; + PeriodicWorkScheduler::PeriodicWorkScheduler( const std::shared_ptr& clock) { timer = std::unique_ptr(new Timer(clock.get())); @@ -24,7 +29,8 @@ Status PeriodicWorkScheduler::Register(DBImpl* dbi, timer->Start(); if (stats_dump_period_sec > 0) { bool succeeded = timer->Add( - [dbi]() { dbi->DumpStats(); }, GetTaskName(dbi, "dump_st"), + [dbi]() { dbi->DumpStats(); }, + GetTaskName(dbi, PeriodicWorkTaskNames::kDumpStats), initial_delay.fetch_add(1) % static_cast(stats_dump_period_sec) * kMicrosInSecond, static_cast(stats_dump_period_sec) * kMicrosInSecond); @@ -34,7 +40,8 @@ Status PeriodicWorkScheduler::Register(DBImpl* dbi, } if (stats_persist_period_sec > 0) { bool succeeded = timer->Add( - [dbi]() { dbi->PersistStats(); }, GetTaskName(dbi, "pst_st"), + [dbi]() { dbi->PersistStats(); }, + GetTaskName(dbi, PeriodicWorkTaskNames::kPersistStats), initial_delay.fetch_add(1) % static_cast(stats_persist_period_sec) * kMicrosInSecond, static_cast(stats_persist_period_sec) * kMicrosInSecond); @@ -42,22 +49,58 @@ Status PeriodicWorkScheduler::Register(DBImpl* dbi, return Status::Aborted("Unable to add periodic task PersistStats"); } } - bool succeeded = timer->Add( - [dbi]() { dbi->FlushInfoLog(); }, GetTaskName(dbi, "flush_info_log"), - initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec * - kMicrosInSecond, - kDefaultFlushInfoLogPeriodSec * kMicrosInSecond); + bool succeeded = + timer->Add([dbi]() { dbi->FlushInfoLog(); }, + GetTaskName(dbi, PeriodicWorkTaskNames::kFlushInfoLog), + initial_delay.fetch_add(1) % kDefaultFlushInfoLogPeriodSec * + kMicrosInSecond, + kDefaultFlushInfoLogPeriodSec * kMicrosInSecond); if (!succeeded) { - return Status::Aborted("Unable to add periodic task PersistStats"); + return Status::Aborted("Unable to add periodic task FlushInfoLog"); } return Status::OK(); } +Status PeriodicWorkScheduler::RegisterRecordSeqnoTimeWorker( + DBImpl* dbi, uint64_t record_cadence_sec) { + MutexLock l(&timer_mu_); + if (record_seqno_time_cadence_ == record_cadence_sec) { + return Status::OK(); + } + if (record_cadence_sec == 0) { + timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kRecordSeqnoTime)); + record_seqno_time_cadence_ = record_cadence_sec; + return Status::OK(); + } + timer->Start(); + static std::atomic_uint64_t initial_delay(0); + bool succeeded = timer->Add( + [dbi]() { dbi->RecordSeqnoToTimeMapping(); }, + GetTaskName(dbi, PeriodicWorkTaskNames::kRecordSeqnoTime), + initial_delay.fetch_add(1) % record_cadence_sec * kMicrosInSecond, + record_cadence_sec * kMicrosInSecond); + if (!succeeded) { + return Status::NotSupported( + "Updating seqno to time worker cadence is not supported yet"); + } + record_seqno_time_cadence_ = record_cadence_sec; + return Status::OK(); +} + +void PeriodicWorkScheduler::UnregisterRecordSeqnoTimeWorker(DBImpl* dbi) { + MutexLock l(&timer_mu_); + timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kRecordSeqnoTime)); + if (!timer->HasPendingTask()) { + timer->Shutdown(); + } + record_seqno_time_cadence_ = 0; +} + void PeriodicWorkScheduler::Unregister(DBImpl* dbi) { MutexLock l(&timer_mu_); - timer->Cancel(GetTaskName(dbi, "dump_st")); - timer->Cancel(GetTaskName(dbi, "pst_st")); - timer->Cancel(GetTaskName(dbi, "flush_info_log")); + timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kDumpStats)); + timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kPersistStats)); + timer->Cancel(GetTaskName(dbi, PeriodicWorkTaskNames::kFlushInfoLog)); if (!timer->HasPendingTask()) { timer->Shutdown(); } @@ -71,8 +114,8 @@ PeriodicWorkScheduler* PeriodicWorkScheduler::Default() { return &scheduler; } -std::string PeriodicWorkScheduler::GetTaskName(DBImpl* dbi, - const std::string& func_name) { +std::string PeriodicWorkScheduler::GetTaskName( + const DBImpl* dbi, const std::string& func_name) const { std::string db_session_id; // TODO: Should this error be ignored? dbi->GetDbSessionId(db_session_id).PermitUncheckedError(); @@ -117,6 +160,14 @@ size_t PeriodicWorkTestScheduler::TEST_GetValidTaskNum() const { return 0; } +bool PeriodicWorkTestScheduler::TEST_HasValidTask( + const DBImpl* dbi, const std::string& func_name) const { + if (timer == nullptr) { + return false; + } + return timer->TEST_HasVaildTask(GetTaskName(dbi, func_name)); +} + PeriodicWorkTestScheduler::PeriodicWorkTestScheduler( const std::shared_ptr& clock) : PeriodicWorkScheduler(clock) {} diff --git a/db/periodic_work_scheduler.h b/db/periodic_work_scheduler.h index e494c7eee6..8b4eb19598 100644 --- a/db/periodic_work_scheduler.h +++ b/db/periodic_work_scheduler.h @@ -32,8 +32,10 @@ class PeriodicWorkScheduler { Status Register(DBImpl* dbi, unsigned int stats_dump_period_sec, unsigned int stats_persist_period_sec); + Status RegisterRecordSeqnoTimeWorker(DBImpl* dbi, uint64_t record_cadence); void Unregister(DBImpl* dbi); + void UnregisterRecordSeqnoTimeWorker(DBImpl* dbi); // Periodically flush info log out of application buffer at a low frequency. // This improves debuggability in case of RocksDB hanging since it ensures the @@ -52,8 +54,12 @@ class PeriodicWorkScheduler { explicit PeriodicWorkScheduler(const std::shared_ptr& clock); + // Get the unique task name (prefix with db session id) + std::string GetTaskName(const DBImpl* dbi, + const std::string& func_name) const; + private: - std::string GetTaskName(DBImpl* dbi, const std::string& func_name); + uint64_t record_seqno_time_cadence_ = 0; }; #ifndef NDEBUG @@ -68,11 +74,20 @@ class PeriodicWorkTestScheduler : public PeriodicWorkScheduler { size_t TEST_GetValidTaskNum() const; + bool TEST_HasValidTask(const DBImpl* dbi, const std::string& func_name) const; + private: explicit PeriodicWorkTestScheduler(const std::shared_ptr& clock); }; #endif // !NDEBUG +struct PeriodicWorkTaskNames { + static const std::string kDumpStats; + static const std::string kPersistStats; + static const std::string kFlushInfoLog; + static const std::string kRecordSeqnoTime; +}; + } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/db/periodic_work_scheduler_test.cc b/db/periodic_work_scheduler_test.cc index acddad9f8c..04771035c2 100644 --- a/db/periodic_work_scheduler_test.cc +++ b/db/periodic_work_scheduler_test.cc @@ -66,7 +66,7 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec); ASSERT_GT(kPeriodSec, 1u); - dbfull()->TEST_WaitForStatsDumpRun([&] { + dbfull()->TEST_WaitForPeridicWorkerRun([&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec) - 1); }); @@ -78,14 +78,14 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { ASSERT_EQ(1, pst_st_counter); ASSERT_EQ(1, flush_info_log_counter); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(2, dump_st_counter); ASSERT_EQ(2, pst_st_counter); ASSERT_EQ(2, flush_info_log_counter); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(3, dump_st_counter); @@ -99,7 +99,7 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { ASSERT_EQ(0u, dbfull()->GetDBOptions().stats_persist_period_sec); // Info log flush should still run. - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(3, dump_st_counter); ASSERT_EQ(3, pst_st_counter); @@ -117,7 +117,7 @@ TEST_F(PeriodicWorkSchedulerTest, Basic) { ASSERT_NE(nullptr, scheduler); ASSERT_EQ(2, scheduler->TEST_GetValidTaskNum()); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(kPeriodSec)); }); ASSERT_EQ(4, dump_st_counter); ASSERT_EQ(3, pst_st_counter); @@ -157,19 +157,19 @@ TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { ASSERT_EQ(kInstanceNum * 3, scheduler->TEST_GetValidTaskNum()); int expected_run = kInstanceNum; - dbi->TEST_WaitForStatsDumpRun( + dbi->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); expected_run += kInstanceNum; - dbi->TEST_WaitForStatsDumpRun( + dbi->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); expected_run += kInstanceNum; - dbi->TEST_WaitForStatsDumpRun( + dbi->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); @@ -181,9 +181,9 @@ TEST_F(PeriodicWorkSchedulerTest, MultiInstances) { expected_run += (kInstanceNum - half) * 2; - dbi->TEST_WaitForStatsDumpRun( + dbi->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); - dbi->TEST_WaitForStatsDumpRun( + dbi->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_EQ(expected_run, dump_st_counter); ASSERT_EQ(expected_run, pst_st_counter); diff --git a/db/repair.cc b/db/repair.cc index 81a2c81ba6..256024571e 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -425,6 +425,7 @@ class Repairer { immutable_db_options_.clock->GetCurrentTime(&_current_time) .PermitUncheckedError(); // ignore error const uint64_t current_time = static_cast(_current_time); + meta.file_creation_time = current_time; SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance(); auto write_hint = cfd->CalculateSSTWriteHint(0); @@ -443,10 +444,11 @@ class Repairer { cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), kNoCompression, default_compression, cfd->GetID(), cfd->GetName(), -1 /* level */, false /* is_bottommost */, - TableFileCreationReason::kRecovery, current_time, - 0 /* oldest_key_time */, 0 /* file_creation_time */, - "DB Repairer" /* db_id */, db_session_id_, 0 /*target_file_size*/, - meta.fd.GetNumber()); + TableFileCreationReason::kRecovery, 0 /* oldest_key_time */, + 0 /* file_creation_time */, "DB Repairer" /* db_id */, db_session_id_, + 0 /*target_file_size*/, meta.fd.GetNumber()); + + SeqnoToTimeMapping empty_seqno_time_mapping; status = BuildTable( dbname_, /* versions */ nullptr, immutable_db_options_, tboptions, file_options_, table_cache_.get(), iter.get(), @@ -454,8 +456,8 @@ class Repairer { {}, kMaxSequenceNumber, kMaxSequenceNumber, snapshot_checker, false /* paranoid_file_checks*/, nullptr /* internal_stats */, &io_s, nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery, - nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, - nullptr /* table_properties */, write_hint); + empty_seqno_time_mapping, nullptr /* event_logger */, 0 /* job_id */, + Env::IO_HIGH, nullptr /* table_properties */, write_hint); ROCKS_LOG_INFO(db_options_.info_log, "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, meta.fd.GetNumber(), diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc new file mode 100644 index 0000000000..fa24c14ab4 --- /dev/null +++ b/db/seqno_time_test.cc @@ -0,0 +1,612 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "db/periodic_work_scheduler.h" +#include "db/seqno_to_time_mapping.h" +#include "port/stack_trace.h" +#include "test_util/mock_time_env.h" + +#ifndef ROCKSDB_LITE + +namespace ROCKSDB_NAMESPACE { + +class SeqnoTimeTest : public DBTestBase { + public: + SeqnoTimeTest() : DBTestBase("seqno_time_test", /*env_do_fsync=*/false) { + mock_clock_ = std::make_shared(env_->GetSystemClock()); + mock_env_ = std::make_unique(env_, mock_clock_); + } + + protected: + std::unique_ptr mock_env_; + std::shared_ptr mock_clock_; + + void SetUp() override { + mock_clock_->InstallTimedWaitFixCallback(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::StartPeriodicWorkScheduler:Init", [&](void* arg) { + auto* periodic_work_scheduler_ptr = + reinterpret_cast(arg); + *periodic_work_scheduler_ptr = + PeriodicWorkTestScheduler::Default(mock_clock_); + }); + } +}; + +TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { + Options options = CurrentOptions(); + options.preclude_last_level_data_seconds = 10000; + options.env = mock_env_.get(); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + + std::set checked_file_nums; + SequenceNumber start_seq = dbfull()->GetLatestSequenceNumber(); + // Write a key every 10 seconds + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); + } + ASSERT_OK(Flush()); + TablePropertiesCollection tables_props; + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 1); + auto it = tables_props.begin(); + SeqnoToTimeMapping tp_mapping; + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + ASSERT_FALSE(tp_mapping.Empty()); + auto seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 19); + ASSERT_LE(seqs.size(), 21); + SequenceNumber seq_end = dbfull()->GetLatestSequenceNumber(); + for (auto i = start_seq; i < start_seq + 10; i++) { + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i + 1) * 10); + } + start_seq += 10; + for (auto i = start_seq; i < seq_end; i++) { + // The result is within the range + ASSERT_GE(tp_mapping.GetOldestApproximateTime(i), (i - 10) * 10); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i + 10) * 10); + } + checked_file_nums.insert(it->second->orig_file_number); + start_seq = seq_end; + + // Write a key every 1 seconds + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i + 190), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(1)); }); + } + seq_end = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(Flush()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 2); + it = tables_props.begin(); + while (it != tables_props.end()) { + if (!checked_file_nums.count(it->second->orig_file_number)) { + break; + } + it++; + } + ASSERT_TRUE(it != tables_props.end()); + + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + // There only a few time sample + ASSERT_GE(seqs.size(), 1); + ASSERT_LE(seqs.size(), 3); + for (auto i = start_seq; i < seq_end; i++) { + // The result is not very accurate, as there is more data write within small + // range of time + ASSERT_GE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 1000); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 3000); + } + checked_file_nums.insert(it->second->orig_file_number); + start_seq = seq_end; + + // Write a key every 200 seconds + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i + 380), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(200)); }); + } + seq_end = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(Flush()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 3); + it = tables_props.begin(); + while (it != tables_props.end()) { + if (!checked_file_nums.count(it->second->orig_file_number)) { + break; + } + it++; + } + ASSERT_TRUE(it != tables_props.end()); + + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + // The sequence number -> time entries should be maxed + ASSERT_GE(seqs.size(), 99); + ASSERT_LE(seqs.size(), 101); + for (auto i = start_seq; i < seq_end - 99; i++) { + // likely the first 100 entries reports 0 + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 3000); + } + start_seq += 101; + + for (auto i = start_seq; i < seq_end; i++) { + ASSERT_GE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 200 + 22200); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 200 + 22600); + } + checked_file_nums.insert(it->second->orig_file_number); + start_seq = seq_end; + + // Write a key every 100 seconds + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i + 570), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); + } + seq_end = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(Flush()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 4); + it = tables_props.begin(); + while (it != tables_props.end()) { + if (!checked_file_nums.count(it->second->orig_file_number)) { + break; + } + it++; + } + ASSERT_TRUE(it != tables_props.end()); + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 99); + ASSERT_LE(seqs.size(), 101); + + checked_file_nums.insert(it->second->orig_file_number); + + // re-enable compaction + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "false"}, + })); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_GE(tables_props.size(), 1); + it = tables_props.begin(); + while (it != tables_props.end()) { + if (!checked_file_nums.count(it->second->orig_file_number)) { + break; + } + it++; + } + ASSERT_TRUE(it != tables_props.end()); + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 99); + ASSERT_LE(seqs.size(), 101); + for (auto i = start_seq; i < seq_end - 99; i++) { + // likely the first 100 entries reports 0 + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 3000); + } + start_seq += 101; + + for (auto i = start_seq; i < seq_end; i++) { + ASSERT_GE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 100 + 52200); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 100 + 52400); + } + ASSERT_OK(db_->Close()); +} + +// TODO(zjay): Disabled, until New CF bug with preclude_last_level_data_seconds +// is fixed +TEST_F(SeqnoTimeTest, DISABLED_MultiCFs) { + Options options = CurrentOptions(); + options.preclude_last_level_data_seconds = 0; + options.env = mock_env_.get(); + options.stats_dump_period_sec = 0; + options.stats_persist_period_sec = 0; + ReopenWithColumnFamilies({"default"}, options); + + auto scheduler = dbfull()->TEST_GetPeriodicWorkScheduler(); + ASSERT_FALSE(scheduler->TEST_HasValidTask( + dbfull(), PeriodicWorkTaskNames::kRecordSeqnoTime)); + + // Write some data and increase the current time + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); + } + ASSERT_OK(Flush()); + TablePropertiesCollection tables_props; + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 1); + auto it = tables_props.begin(); + ASSERT_TRUE(it->second->seqno_to_time_mapping.empty()); + + ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty()); + + Options options_1 = options; + options_1.preclude_last_level_data_seconds = 10000; // 10k + CreateColumnFamilies({"one"}, options_1); + ASSERT_TRUE(scheduler->TEST_HasValidTask( + dbfull(), PeriodicWorkTaskNames::kRecordSeqnoTime)); + + // Write some data to the default CF (without preclude_last_level feature) + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); + } + ASSERT_OK(Flush()); + + // in memory mapping won't increase because CFs with preclude_last_level + // feature doesn't have memtable + auto queue = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); + ASSERT_LT(queue.size(), 5); + + // Write some data to the CF one + for (int i = 0; i < 20; i++) { + ASSERT_OK(Put(1, Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); + } + ASSERT_OK(Flush(1)); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[1], &tables_props)); + ASSERT_EQ(tables_props.size(), 1); + it = tables_props.begin(); + SeqnoToTimeMapping tp_mapping; + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + ASSERT_FALSE(tp_mapping.Empty()); + auto seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 1); + ASSERT_LE(seqs.size(), 3); + + // Create one more CF with larger preclude_last_level time + Options options_2 = options; + options_2.preclude_last_level_data_seconds = 1000000; // 1m + CreateColumnFamilies({"two"}, options_2); + + // Add more data to CF "two" to fill the in memory mapping + for (int i = 0; i < 2000; i++) { + ASSERT_OK(Put(2, Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); + } + seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 1000 - 1); + ASSERT_LE(seqs.size(), 1000 + 1); + + ASSERT_OK(Flush(2)); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[2], &tables_props)); + ASSERT_EQ(tables_props.size(), 1); + it = tables_props.begin(); + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + // the max encoded entries is 100 + ASSERT_GE(seqs.size(), 100 - 1); + ASSERT_LE(seqs.size(), 100 + 1); + + // Write some data to default CF, as all memtable with preclude_last_level + // enabled have flushed, the in-memory seqno->time mapping should be cleared + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(0, Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); + } + seqs = dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping(); + ASSERT_LE(seqs.size(), 5); + ASSERT_OK(Flush(0)); + + // trigger compaction for CF "two" and make sure the compaction output has + // seqno_to_time_mapping + for (int j = 0; j < 3; j++) { + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(2, Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); + } + ASSERT_OK(Flush(2)); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[2], &tables_props)); + ASSERT_EQ(tables_props.size(), 1); + it = tables_props.begin(); + tp_mapping.Clear(); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 99); + ASSERT_LE(seqs.size(), 101); + + for (int j = 0; j < 2; j++) { + for (int i = 0; i < 200; i++) { + ASSERT_OK(Put(0, Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); + } + ASSERT_OK(Flush(0)); + } + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(handles_[0], &tables_props)); + ASSERT_EQ(tables_props.size(), 1); + it = tables_props.begin(); + ASSERT_TRUE(it->second->seqno_to_time_mapping.empty()); + + // Write some data to CF "two", but don't flush to accumulate + for (int i = 0; i < 1000; i++) { + ASSERT_OK(Put(2, Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(100)); }); + } + ASSERT_GE( + dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping().size(), + 500); + // After dropping CF "one", the in-memory mapping will be change to only + // follow CF "two" options. + ASSERT_OK(db_->DropColumnFamily(handles_[1])); + ASSERT_LE( + dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping().size(), + 100 + 5); + + // After dropping CF "two", the in-memory mapping is also clear. + ASSERT_OK(db_->DropColumnFamily(handles_[2])); + ASSERT_EQ( + dbfull()->TEST_GetSeqnoToTimeMapping().TEST_GetInternalMapping().size(), + 0); + + // And the timer worker is stopped + ASSERT_FALSE(scheduler->TEST_HasValidTask( + dbfull(), PeriodicWorkTaskNames::kRecordSeqnoTime)); + Close(); +} + +TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preclude_last_level_data_seconds = 10000; + options.env = mock_env_.get(); + + DestroyAndReopen(options); + + for (int j = 0; j < 3; j++) { + for (int i = 0; i < 100; i++) { + ASSERT_OK(Put(Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); + } + ASSERT_OK(Flush()); + } + TablePropertiesCollection tables_props; + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 3); + for (const auto& props : tables_props) { + ASSERT_FALSE(props.second->seqno_to_time_mapping.empty()); + SeqnoToTimeMapping tp_mapping; + ASSERT_OK(tp_mapping.Add(props.second->seqno_to_time_mapping)); + ASSERT_OK(tp_mapping.Sort()); + ASSERT_FALSE(tp_mapping.Empty()); + auto seqs = tp_mapping.TEST_GetInternalMapping(); + ASSERT_GE(seqs.size(), 10 - 1); + ASSERT_LE(seqs.size(), 10 + 1); + } + + // Trigger a compaction + for (int i = 0; i < 100; i++) { + ASSERT_OK(Put(Key(i), "value")); + dbfull()->TEST_WaitForPeridicWorkerRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); + } + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + tables_props.clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + ASSERT_EQ(tables_props.size(), 1); + + auto it = tables_props.begin(); + SeqnoToTimeMapping tp_mapping; + ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + Close(); +} + +TEST_F(SeqnoTimeTest, MappingAppend) { + SeqnoToTimeMapping test(/*max_time_duration=*/100, /*max_capacity=*/10); + + // ignore seqno == 0, as it may mean the seqno is zeroed out + ASSERT_FALSE(test.Append(0, 9)); + + ASSERT_TRUE(test.Append(3, 10)); + auto size = test.Size(); + // normal add + ASSERT_TRUE(test.Append(10, 11)); + size++; + ASSERT_EQ(size, test.Size()); + + // Append unsorted + ASSERT_FALSE(test.Append(8, 12)); + ASSERT_EQ(size, test.Size()); + + // Append with the same seqno, newer time will be accepted + ASSERT_TRUE(test.Append(10, 12)); + ASSERT_EQ(size, test.Size()); + // older time will be ignored + ASSERT_FALSE(test.Append(10, 9)); + ASSERT_EQ(size, test.Size()); + + // new seqno with old time will be ignored + ASSERT_FALSE(test.Append(12, 8)); + ASSERT_EQ(size, test.Size()); +} + +TEST_F(SeqnoTimeTest, GetOldestApproximateTime) { + SeqnoToTimeMapping test(/*max_time_duration=*/100, /*max_capacity=*/10); + + ASSERT_EQ(test.GetOldestApproximateTime(10), kUnknownSeqnoTime); + + test.Append(3, 10); + + ASSERT_EQ(test.GetOldestApproximateTime(2), kUnknownSeqnoTime); + ASSERT_EQ(test.GetOldestApproximateTime(3), 10); + ASSERT_EQ(test.GetOldestApproximateTime(10), 10); + + test.Append(10, 100); + + test.Append(100, 1000); + ASSERT_EQ(test.GetOldestApproximateTime(10), 100); + ASSERT_EQ(test.GetOldestApproximateTime(40), 100); + ASSERT_EQ(test.GetOldestApproximateTime(111), 1000); +} + +TEST_F(SeqnoTimeTest, Sort) { + SeqnoToTimeMapping test; + + // single entry + test.Add(10, 11); + ASSERT_OK(test.Sort()); + ASSERT_EQ(test.Size(), 1); + + // duplicate, should be removed by sort + test.Add(10, 11); + // same seqno, but older time, should be removed + test.Add(10, 9); + + // unuseful ones, should be removed by sort + test.Add(11, 9); + test.Add(9, 8); + + // Good ones + test.Add(1, 10); + test.Add(100, 100); + + ASSERT_OK(test.Sort()); + + auto seqs = test.TEST_GetInternalMapping(); + + std::deque expected; + expected.emplace_back(1, 10); + expected.emplace_back(10, 11); + expected.emplace_back(100, 100); + + ASSERT_EQ(expected, seqs); +} + +TEST_F(SeqnoTimeTest, EncodeDecodeBasic) { + SeqnoToTimeMapping test(0, 1000); + + std::string output; + test.Encode(output, 0, 1000, 100); + ASSERT_TRUE(output.empty()); + + for (int i = 1; i <= 1000; i++) { + ASSERT_TRUE(test.Append(i, i * 10)); + } + test.Encode(output, 0, 1000, 100); + + ASSERT_FALSE(output.empty()); + + SeqnoToTimeMapping decoded; + ASSERT_OK(decoded.Add(output)); + ASSERT_OK(decoded.Sort()); + ASSERT_EQ(decoded.Size(), SeqnoToTimeMapping::kMaxSeqnoTimePairsPerSST); + ASSERT_EQ(test.Size(), 1000); + + for (SequenceNumber seq = 0; seq <= 1000; seq++) { + // test has the more accurate time mapping, encode only pick + // kMaxSeqnoTimePairsPerSST number of entries, which is less accurate + uint64_t target_time = test.GetOldestApproximateTime(seq); + ASSERT_GE(decoded.GetOldestApproximateTime(seq), + target_time < 200 ? 0 : target_time - 200); + ASSERT_LE(decoded.GetOldestApproximateTime(seq), target_time); + } +} + +TEST_F(SeqnoTimeTest, EncodeDecodePerferNewTime) { + SeqnoToTimeMapping test(0, 10); + + test.Append(1, 10); + test.Append(5, 17); + test.Append(6, 25); + test.Append(8, 30); + + std::string output; + test.Encode(output, 1, 10, 0, 3); + + SeqnoToTimeMapping decoded; + ASSERT_OK(decoded.Add(output)); + ASSERT_OK(decoded.Sort()); + + ASSERT_EQ(decoded.Size(), 3); + + auto seqs = decoded.TEST_GetInternalMapping(); + std::deque expected; + expected.emplace_back(1, 10); + expected.emplace_back(6, 25); + expected.emplace_back(8, 30); + ASSERT_EQ(expected, seqs); + + // Add a few large time number + test.Append(10, 100); + test.Append(13, 200); + test.Append(16, 300); + + output.clear(); + test.Encode(output, 1, 20, 0, 4); + decoded.Clear(); + ASSERT_OK(decoded.Add(output)); + ASSERT_OK(decoded.Sort()); + ASSERT_EQ(decoded.Size(), 4); + + expected.clear(); + expected.emplace_back(1, 10); + // entry #6, #8 are skipped as they are too close to #1. + // entry #100 is also within skip range, but if it's skipped, there not enough + // number to fill 4 entries, so select it. + expected.emplace_back(10, 100); + expected.emplace_back(13, 200); + expected.emplace_back(16, 300); + seqs = decoded.TEST_GetInternalMapping(); + ASSERT_EQ(expected, seqs); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/seqno_to_time_mapping.cc b/db/seqno_to_time_mapping.cc new file mode 100644 index 0000000000..6b53fe161d --- /dev/null +++ b/db/seqno_to_time_mapping.cc @@ -0,0 +1,320 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/seqno_to_time_mapping.h" + +#include "db/version_edit.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +uint64_t SeqnoToTimeMapping::GetOldestApproximateTime( + const SequenceNumber seqno) const { + assert(is_sorted_); + auto it = std::upper_bound(seqno_time_mapping_.begin(), + seqno_time_mapping_.end(), seqno); + if (it == seqno_time_mapping_.begin()) { + return 0; + } + it--; + return it->time; +} + +void SeqnoToTimeMapping::Add(SequenceNumber seqno, uint64_t time) { + if (seqno == 0) { + return; + } + is_sorted_ = false; + seqno_time_mapping_.emplace_back(seqno, time); +} + +SequenceNumber SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { + assert(is_sorted_); + + if (max_time_duration_ == 0) { + return 0; + } + + const uint64_t cut_off_time = + now > max_time_duration_ ? now - max_time_duration_ : 0; + assert(cut_off_time < now); // no overflow + + auto it = std::upper_bound( + seqno_time_mapping_.begin(), seqno_time_mapping_.end(), cut_off_time, + [](uint64_t target, const SeqnoTimePair& other) -> bool { + return target < other.time; + }); + if (it == seqno_time_mapping_.begin()) { + return 0; + } + it--; + seqno_time_mapping_.erase(seqno_time_mapping_.begin(), it); + + return seqno_time_mapping_.front().seqno; +} + +// The encoded format is: +// [num_of_entries][[seqno][time],[seqno][time],...] +// ^ ^ +// var_int delta_encoded (var_int) +void SeqnoToTimeMapping::Encode(std::string& dest, const SequenceNumber start, + const SequenceNumber end, const uint64_t now, + const uint64_t output_size) const { + assert(is_sorted_); + if (start > end) { + // It could happen when the SST file is empty, the initial value of min + // sequence number is kMaxSequenceNumber and max is 0. + // The empty output file will be removed in the final step of compaction. + return; + } + + auto start_it = std::upper_bound(seqno_time_mapping_.begin(), + seqno_time_mapping_.end(), start); + if (start_it != seqno_time_mapping_.begin()) { + start_it--; + } + + auto end_it = std::upper_bound(seqno_time_mapping_.begin(), + seqno_time_mapping_.end(), end); + if (end_it == seqno_time_mapping_.begin()) { + return; + } + if (start_it >= end_it) { + return; + } + + // truncate old entries that are not needed + if (max_time_duration_ > 0) { + const uint64_t cut_off_time = + now > max_time_duration_ ? now - max_time_duration_ : 0; + while (start_it < end_it && start_it->time < cut_off_time) { + start_it++; + } + } + + // If there are more data than needed, pick the entries for encoding. + // It's not the most optimized algorithm for selecting the best representative + // entries over the time. + // It starts from the beginning and makes sure the distance is larger than + // `(end - start) / size` before selecting the number. For example, for the + // following list, pick 3 entries (it will pick seqno #1, #6, #8): + // 1 -> 10 + // 5 -> 17 + // 6 -> 25 + // 8 -> 30 + // first, it always picks the first one, then there are 2 num_entries_to_fill + // and the time difference between current one vs. the last one is + // (30 - 10) = 20. 20/2 = 10. So it will skip until 10+10 = 20. => it skips + // #5 and pick #6. + // But the most optimized solution is picking #1 #5 #8, as it will be more + // evenly distributed for time. Anyway the following algorithm is simple and + // may over-select new data, which is good. We do want more accurate time + // information for recent data. + std::deque output_copy; + if (std::distance(start_it, end_it) > static_cast(output_size)) { + int64_t num_entries_to_fill = static_cast(output_size); + auto last_it = end_it; + last_it--; + uint64_t end_time = last_it->time; + uint64_t skip_until_time = 0; + for (auto it = start_it; it < end_it; it++) { + // skip if it's not reach the skip_until_time yet + if (std::distance(it, end_it) > num_entries_to_fill && + it->time < skip_until_time) { + continue; + } + output_copy.push_back(*it); + num_entries_to_fill--; + if (std::distance(it, end_it) > num_entries_to_fill && + num_entries_to_fill > 0) { + // If there are more entries than we need, re-calculate the + // skip_until_time, which means skip until that time + skip_until_time = + it->time + ((end_time - it->time) / num_entries_to_fill); + } + } + + // Make sure all entries are filled + assert(num_entries_to_fill == 0); + start_it = output_copy.begin(); + end_it = output_copy.end(); + } + + // Delta encode the data + uint64_t size = std::distance(start_it, end_it); + PutVarint64(&dest, size); + SeqnoTimePair base; + for (auto it = start_it; it < end_it; it++) { + assert(base < *it); + SeqnoTimePair val = *it - base; + base = *it; + val.Encode(dest); + } +} + +Status SeqnoToTimeMapping::Add(const std::string& seqno_time_mapping_str) { + Slice input(seqno_time_mapping_str); + if (input.empty()) { + return Status::OK(); + } + uint64_t size; + if (!GetVarint64(&input, &size)) { + return Status::Corruption("Invalid sequence number time size"); + } + is_sorted_ = false; + SeqnoTimePair base; + for (uint64_t i = 0; i < size; i++) { + SeqnoTimePair val; + Status s = val.Decode(input); + if (!s.ok()) { + return s; + } + val.Add(base); + seqno_time_mapping_.emplace_back(val); + base = val; + } + return Status::OK(); +} + +void SeqnoToTimeMapping::SeqnoTimePair::Encode(std::string& dest) const { + PutVarint64Varint64(&dest, seqno, time); +} + +Status SeqnoToTimeMapping::SeqnoTimePair::Decode(Slice& input) { + if (!GetVarint64(&input, &seqno)) { + return Status::Corruption("Invalid sequence number"); + } + if (!GetVarint64(&input, &time)) { + return Status::Corruption("Invalid time"); + } + return Status::OK(); +} + +bool SeqnoToTimeMapping::Append(SequenceNumber seqno, uint64_t time) { + assert(is_sorted_); + + // skip seq number 0, which may have special meaning, like zeroed out data + if (seqno == 0) { + return false; + } + if (!Empty()) { + if (seqno < Last().seqno || time < Last().time) { + return false; + } + if (seqno == Last().seqno) { + Last().time = time; + return true; + } + if (time == Last().time) { + // new sequence has the same time as old one, no need to add new mapping + return false; + } + } + + seqno_time_mapping_.emplace_back(seqno, time); + + if (seqno_time_mapping_.size() > max_capacity_) { + seqno_time_mapping_.pop_front(); + } + return true; +} + +bool SeqnoToTimeMapping::Resize(uint64_t min_time_duration, + uint64_t max_time_duration) { + uint64_t new_max_capacity = + CalculateMaxCapacity(min_time_duration, max_time_duration); + if (new_max_capacity == max_capacity_) { + return false; + } else if (new_max_capacity < seqno_time_mapping_.size()) { + uint64_t delta = seqno_time_mapping_.size() - new_max_capacity; + seqno_time_mapping_.erase(seqno_time_mapping_.begin(), + seqno_time_mapping_.begin() + delta); + } + max_capacity_ = new_max_capacity; + return true; +} + +Status SeqnoToTimeMapping::Sort() { + if (is_sorted_ || seqno_time_mapping_.empty()) { + return Status::OK(); + } + + std::deque copy = std::move(seqno_time_mapping_); + + std::sort(copy.begin(), copy.end()); + + seqno_time_mapping_.clear(); + + // remove seqno = 0, which may have special meaning, like zeroed out data + while (copy.front().seqno == 0) { + copy.pop_front(); + } + + SeqnoTimePair prev = copy.front(); + for (const auto& it : copy) { + // If sequence number is the same, pick the one with larger time, which is + // more accurate than the older time. + if (it.seqno == prev.seqno) { + assert(it.time >= prev.time); + prev.time = it.time; + } else { + assert(it.seqno > prev.seqno); + // If a larger sequence number has an older time which is not useful, skip + if (it.time > prev.time) { + seqno_time_mapping_.push_back(prev); + prev = it; + } + } + } + seqno_time_mapping_.emplace_back(prev); + + is_sorted_ = true; + return Status::OK(); +} + +std::string SeqnoToTimeMapping::ToHumanString() const { + std::string ret; + for (const auto& seq_time : seqno_time_mapping_) { + AppendNumberTo(&ret, seq_time.seqno); + ret.append("->"); + AppendNumberTo(&ret, seq_time.time); + ret.append(","); + } + return ret; +} + +SeqnoToTimeMapping SeqnoToTimeMapping::Copy( + SequenceNumber smallest_seqno) const { + SeqnoToTimeMapping ret; + auto it = std::upper_bound(seqno_time_mapping_.begin(), + seqno_time_mapping_.end(), smallest_seqno); + if (it != seqno_time_mapping_.begin()) { + it--; + } + std::copy(it, seqno_time_mapping_.end(), + std::back_inserter(ret.seqno_time_mapping_)); + return ret; +} + +uint64_t SeqnoToTimeMapping::CalculateMaxCapacity(uint64_t min_time_duration, + uint64_t max_time_duration) { + if (min_time_duration == 0) { + return 0; + } + return std::min( + kMaxSeqnoToTimeEntries, + max_time_duration * kMaxSeqnoTimePairsPerCF / min_time_duration); +} + +SeqnoToTimeMapping::SeqnoTimePair SeqnoToTimeMapping::SeqnoTimePair::operator-( + const SeqnoTimePair& other) const { + SeqnoTimePair res; + res.seqno = seqno - other.seqno; + res.time = time - other.time; + return res; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/seqno_to_time_mapping.h b/db/seqno_to_time_mapping.h new file mode 100644 index 0000000000..6f5476b340 --- /dev/null +++ b/db/seqno_to_time_mapping.h @@ -0,0 +1,183 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "rocksdb/status.h" +#include "rocksdb/types.h" + +namespace ROCKSDB_NAMESPACE { + +constexpr uint64_t kUnknownSeqnoTime = 0; + +// SeqnoToTimeMapping stores the sequence number to time mapping, so given a +// sequence number it can estimate the oldest possible time for that sequence +// number. For example: +// 10 -> 100 +// 50 -> 300 +// then if a key has seqno 19, the OldestApproximateTime would be 100, for 51 it +// would be 300. +// As it's a sorted list, the new entry is inserted from the back. The old data +// will be popped from the front if they're no longer used. +class SeqnoToTimeMapping { + public: + // Maximum number of entries can be encoded into SST. The data is delta encode + // so the maximum data usage for each SST is < 0.3K + static constexpr uint64_t kMaxSeqnoTimePairsPerSST = 100; + + // Maximum number of entries per CF. If there's only CF with this feature on, + // the max duration divided by this number, so for example, if + // preclude_last_level_data_seconds = 100000 (~1day), then it will sample the + // seqno -> time every 1000 seconds (~17minutes). Then the maximum entry it + // needs is 100. + // When there are multiple CFs having this feature on, the sampling cadence is + // determined by the smallest setting, the capacity is determined the largest + // setting, also it's caped by kMaxSeqnoTimePairsPerCF * 10. + static constexpr uint64_t kMaxSeqnoTimePairsPerCF = 100; + + // A simple struct for sequence number to time pair + struct SeqnoTimePair { + SequenceNumber seqno = 0; + uint64_t time = 0; + + SeqnoTimePair() = default; + SeqnoTimePair(SequenceNumber _seqno, uint64_t _time) + : seqno(_seqno), time(_time) {} + + // Encode to dest string + void Encode(std::string& dest) const; + + // Decode the value from input Slice and remove it from the input + Status Decode(Slice& input); + + // subtraction of 2 SeqnoTimePair + SeqnoTimePair operator-(const SeqnoTimePair& other) const; + + // Add 2 values together + void Add(const SeqnoTimePair& obj) { + seqno += obj.seqno; + time += obj.time; + } + + // Compare SeqnoTimePair with a sequence number, used for binary search a + // sequence number in a list of SeqnoTimePair + bool operator<(const SequenceNumber& other) const { return seqno < other; } + + // Compare 2 SeqnoTimePair + bool operator<(const SeqnoTimePair& other) const { + return std::tie(seqno, time) < std::tie(other.seqno, other.time); + } + + // Check if 2 SeqnoTimePair is the same + bool operator==(const SeqnoTimePair& other) const { + return std::tie(seqno, time) == std::tie(other.seqno, other.time); + } + }; + + // constractor of SeqnoToTimeMapping + // max_time_duration is the maximum time it should track. For example, if + // preclude_last_level_data_seconds is 1 day, then if an entry is older than 1 + // day, then it can be removed. + // max_capacity is the maximum number of entry it can hold. For single CF, + // it's caped at 100 (kMaxSeqnoTimePairsPerCF), otherwise + // kMaxSeqnoTimePairsPerCF * 10. + // If it's set to 0, means it won't truncate any old data. + explicit SeqnoToTimeMapping(uint64_t max_time_duration = 0, + uint64_t max_capacity = 0) + : max_time_duration_(max_time_duration), max_capacity_(max_capacity) {} + + // Append a new entry to the list. The new entry should be newer than the + // existing ones. It maintains the internal sorted status. + bool Append(SequenceNumber seqno, uint64_t time); + + // Given a sequence number, estimate it's oldest time + uint64_t GetOldestApproximateTime(SequenceNumber seqno) const; + + // Truncate the old entries based on the current time and max_time_duration_ + SequenceNumber TruncateOldEntries(uint64_t now); + + // Encode to a binary string + void Encode(std::string& des, SequenceNumber start, SequenceNumber end, + uint64_t now, + uint64_t output_size = kMaxSeqnoTimePairsPerSST) const; + + // Add a new random entry, unlike Append(), it can be any data, but also makes + // the list un-sorted. + void Add(SequenceNumber seqno, uint64_t time); + + // Decode and add the entries to the current obj. The list will be unsorted + Status Add(const std::string& seqno_time_mapping_str); + + // Return the number of entries + size_t Size() const { return seqno_time_mapping_.size(); } + + // Reduce the size of internal list + bool Resize(uint64_t min_time_duration, uint64_t max_time_duration); + + // Override the max_time_duration_ + void SetMaxTimeDuration(uint64_t max_time_duration) { + max_time_duration_ = max_time_duration; + } + + uint64_t GetCapacity() const { return max_capacity_; } + + // Sort the list, which also remove the redundant entries, useless entries, + // which makes sure the seqno is sorted, but also the time + Status Sort(); + + // copy the current obj from the given smallest_seqno. + SeqnoToTimeMapping Copy(SequenceNumber smallest_seqno) const; + + // If the internal list is empty + bool Empty() const { return seqno_time_mapping_.empty(); } + + // clear all entries + void Clear() { seqno_time_mapping_.clear(); } + + // return the string for user message + // Note: Not efficient, okay for print + std::string ToHumanString() const; + +#ifndef NDEBUG + const std::deque& TEST_GetInternalMapping() const { + return seqno_time_mapping_; + } +#endif + + private: + static constexpr uint64_t kMaxSeqnoToTimeEntries = + kMaxSeqnoTimePairsPerCF * 10; + + uint64_t max_time_duration_; + uint64_t max_capacity_; + + std::deque seqno_time_mapping_; + + bool is_sorted_ = true; + + static uint64_t CalculateMaxCapacity(uint64_t min_time_duration, + uint64_t max_time_duration); + + SeqnoTimePair& Last() { + assert(!Empty()); + return seqno_time_mapping_.back(); + } +}; + +// for searching the sequence number from SeqnoToTimeMapping +inline bool operator<(const SequenceNumber& seqno, + const SeqnoToTimeMapping::SeqnoTimePair& other) { + return seqno < other.seqno; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 07a5591a56..32598d04d8 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -874,6 +874,15 @@ struct AdvancedColumnFamilyOptions { // Dynamically changeable through the SetOptions() API Temperature bottommost_temperature = Temperature::kUnknown; + // EXPERIMENTAL + // The feature is still in development and is incomplete. + // If this option is set, when data insert time is within this time range, it + // will be precluded from the last level. + // 0 means no key will be precluded from the last level. + // + // Default: 0 (disable the feature) + uint64_t preclude_last_level_data_seconds = 0; + // When set, large values (blobs) are written to separate blob files, and // only pointers to them are stored in SST files. This can reduce write // amplification for large-value use cases at the cost of introducing a level diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index b91ab604af..9ed17cbb86 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -69,6 +69,7 @@ struct TablePropertiesNames { static const std::string kFileCreationTime; static const std::string kSlowCompressionEstimatedDataSize; static const std::string kFastCompressionEstimatedDataSize; + static const std::string kSequenceNumberTimeMapping; }; // `TablePropertiesCollector` provides the mechanism for users to collect @@ -220,6 +221,7 @@ struct TableProperties { // TODO(sagar0): Should be changed to latest_key_time ... but don't know the // full implications of backward compatibility. Hence retaining for now. uint64_t creation_time = 0; + // Timestamp of the earliest key. 0 means unknown. uint64_t oldest_key_time = 0; // Actual SST file creation time. 0 means unknown. @@ -284,6 +286,9 @@ struct TableProperties { // Compression options used to compress the SST files. std::string compression_options; + // Sequence number to time mapping, delta encoded. + std::string seqno_to_time_mapping; + // user collected properties UserCollectedProperties user_collected_properties; UserCollectedProperties readable_properties; diff --git a/monitoring/stats_history_test.cc b/monitoring/stats_history_test.cc index 1fe5503cbe..d968735e20 100644 --- a/monitoring/stats_history_test.cc +++ b/monitoring/stats_history_test.cc @@ -67,10 +67,10 @@ TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_GE(counter, 1); @@ -99,17 +99,17 @@ TEST_F(StatsHistoryTest, StatsPersistScheduling) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_GE(counter, 1); // Test cancel job through SetOptions ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "0"}})); int old_val = counter; - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec * 2); }); ASSERT_EQ(counter, old_val); @@ -131,7 +131,7 @@ TEST_F(StatsHistoryTest, PersistentStatsFreshInstall) { {{"stats_persist_period_sec", std::to_string(kPeriodSec)}})); ASSERT_EQ(kPeriodSec, dbfull()->GetDBOptions().stats_persist_period_sec); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); ASSERT_GE(counter, 1); Close(); @@ -150,11 +150,11 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { ReopenWithColumnFamilies({"default", "pikachu"}, options); // make sure the first stats persist to finish - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); // Wait for stats persist to finish - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); std::unique_ptr stats_iter; @@ -172,7 +172,7 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) { ASSERT_GT(stats_count, 0); // Wait a bit and verify no more stats are found for (int i = 0; i < 10; ++i) { - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(1); }); } ASSERT_OK(db_->GetStatsHistory(0, mock_clock_->NowSeconds(), &stats_iter)); @@ -227,7 +227,7 @@ TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { const int kIterations = 10; for (int i = 0; i < kIterations; ++i) { - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); } @@ -251,7 +251,7 @@ TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { // Wait for stats persist to finish for (int i = 0; i < kIterations; ++i) { - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); } @@ -300,11 +300,11 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); // Wait for stats persist to finish - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); auto iter = @@ -312,14 +312,14 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) { int key_count1 = countkeys(iter); delete iter; - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); int key_count2 = countkeys(iter); delete iter; - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); @@ -393,32 +393,32 @@ TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); // Wait for stats persist to finish - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); auto iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); countkeys(iter); delete iter; - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); std::map stats_map_after; @@ -482,10 +482,10 @@ TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) { ASSERT_EQ(Get(2, "foo"), "bar"); // make sure the first stats persist to finish - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); auto iter = db_->NewIterator(ReadOptions(), dbfull()->PersistentStatsColumnFamily()); @@ -582,7 +582,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { // Wait for the first stats persist to finish, as the initial delay could be // different. - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec - 1); }); ColumnFamilyData* cfd_default = @@ -601,7 +601,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_OK(Put(1, "Eevee", "v0")); ASSERT_EQ("v0", Get(1, "Eevee")); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to all three cf, flush default cf // LogNumbers: default: 16, stats: 10, pikachu: 5 @@ -630,7 +630,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_EQ("v2", Get("bar2")); ASSERT_EQ("v2", Get("foo2")); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to default and stats cf, flushing default cf // LogNumbers: default: 19, stats: 19, pikachu: 19 @@ -645,7 +645,7 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) { ASSERT_OK(Put(1, "Jolteon", "v3")); ASSERT_EQ("v3", Get(1, "Jolteon")); - dbfull()->TEST_WaitForStatsDumpRun( + dbfull()->TEST_WaitForPeridicWorkerRun( [&] { mock_clock_->MockSleepForSeconds(kPeriodSec); }); // writing to all three cf, flushing test cf // LogNumbers: default: 19, stats: 19, pikachu: 22 diff --git a/options/cf_options.cc b/options/cf_options.cc index a681ea8486..6fbc31151b 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -547,6 +547,10 @@ static std::unordered_map {offsetof(struct ImmutableCFOptions, force_consistency_checks), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"preclude_last_level_data_seconds", + {offsetof(struct ImmutableCFOptions, preclude_last_level_data_seconds), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, // Need to keep this around to be able to read old OPTIONS files. {"max_mem_compaction_level", {0, OptionType::kInt, OptionVerificationType::kDeprecated, @@ -880,6 +884,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ColumnFamilyOptions& cf_options) num_levels(cf_options.num_levels), optimize_filters_for_hits(cf_options.optimize_filters_for_hits), force_consistency_checks(cf_options.force_consistency_checks), + preclude_last_level_data_seconds( + cf_options.preclude_last_level_data_seconds), memtable_insert_with_hint_prefix_extractor( cf_options.memtable_insert_with_hint_prefix_extractor), cf_paths(cf_options.cf_paths), diff --git a/options/cf_options.h b/options/cf_options.h index dccb559a2a..feba505229 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -70,6 +70,8 @@ struct ImmutableCFOptions { bool force_consistency_checks; + uint64_t preclude_last_level_data_seconds; + std::shared_ptr memtable_insert_with_hint_prefix_extractor; diff --git a/options/options.cc b/options/options.cc index 2276c15016..b870bad286 100644 --- a/options/options.cc +++ b/options/options.cc @@ -92,6 +92,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) ttl(options.ttl), periodic_compaction_seconds(options.periodic_compaction_seconds), sample_for_compression(options.sample_for_compression), + preclude_last_level_data_seconds( + options.preclude_last_level_data_seconds), enable_blob_files(options.enable_blob_files), min_blob_size(options.min_blob_size), blob_file_size(options.blob_file_size), @@ -398,6 +400,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { ROCKS_LOG_HEADER(log, " Options.periodic_compaction_seconds: %" PRIu64, periodic_compaction_seconds); + ROCKS_LOG_HEADER(log, " Options.preclude_last_level_data_seconds: %" PRIu64, + preclude_last_level_data_seconds); ROCKS_LOG_HEADER(log, " Options.enable_blob_files: %s", enable_blob_files ? "true" : "false"); ROCKS_LOG_HEADER( diff --git a/options/options_helper.cc b/options/options_helper.cc index 44c57b4369..280f59a4f5 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -306,6 +306,8 @@ void UpdateColumnFamilyOptions(const ImmutableCFOptions& ioptions, cf_opts->compaction_thread_limiter = ioptions.compaction_thread_limiter; cf_opts->sst_partitioner_factory = ioptions.sst_partitioner_factory; cf_opts->blob_cache = ioptions.blob_cache; + cf_opts->preclude_last_level_data_seconds = + ioptions.preclude_last_level_data_seconds; // TODO(yhchiang): find some way to handle the following derived options // * max_file_size diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 3a156a30a7..e4b932a448 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -400,6 +400,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { {offsetof(struct ColumnFamilyOptions, table_properties_collector_factories), sizeof(ColumnFamilyOptions::TablePropertiesCollectorFactories)}, + {offsetof(struct ColumnFamilyOptions, preclude_last_level_data_seconds), + sizeof(uint64_t)}, {offsetof(struct ColumnFamilyOptions, blob_cache), sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, comparator), sizeof(Comparator*)}, @@ -525,6 +527,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "blob_compaction_readahead_size=262144;" "blob_file_starting_level=1;" "bottommost_temperature=kWarm;" + "preclude_last_level_data_seconds=86400;" "compaction_options_fifo={max_table_files_size=3;allow_" "compaction=false;age_for_warm=1;};" "blob_cache=1M;", diff --git a/src.mk b/src.mk index e892ca8992..dc7b426edb 100644 --- a/src.mk +++ b/src.mk @@ -76,6 +76,7 @@ LIB_SOURCES = \ db/range_del_aggregator.cc \ db/range_tombstone_fragmenter.cc \ db/repair.cc \ + db/seqno_to_time_mapping.cc \ db/snapshot_impl.cc \ db/table_cache.cc \ db/table_properties_collector.cc \ @@ -503,6 +504,7 @@ TEST_MAIN_SOURCES = \ db/repair_test.cc \ db/range_del_aggregator_test.cc \ db/range_tombstone_fragmenter_test.cc \ + db/seqno_time_test.cc \ db/table_properties_collector_test.cc \ db/version_builder_test.cc \ db/version_edit_test.cc \ diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index a5f58271c8..565cd8ec81 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -541,7 +541,6 @@ struct BlockBasedTableBuilder::Rep { // These are only needed for populating table properties props.column_family_id = tbo.column_family_id; props.column_family_name = tbo.column_family_name; - props.creation_time = tbo.creation_time; props.oldest_key_time = tbo.oldest_key_time; props.file_creation_time = tbo.file_creation_time; props.orig_file_number = tbo.cur_file_num; @@ -2084,6 +2083,12 @@ const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const { return kUnknownFileChecksumFuncName; } } +void BlockBasedTableBuilder::SetSeqnoTimeTableProperties( + const std::string& encoded_seqno_to_time_mapping, + uint64_t oldest_ancestor_time) { + rep_->props.seqno_to_time_mapping = encoded_seqno_to_time_mapping; + rep_->props.creation_time = oldest_ancestor_time; +} const std::string BlockBasedTable::kObsoleteFilterBlockPrefix = "filter."; const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter."; diff --git a/table/block_based/block_based_table_builder.h b/table/block_based/block_based_table_builder.h index 690ae46d7f..83d47af177 100644 --- a/table/block_based/block_based_table_builder.h +++ b/table/block_based/block_based_table_builder.h @@ -100,6 +100,10 @@ class BlockBasedTableBuilder : public TableBuilder { // Get file checksum function name const char* GetFileChecksumFuncName() const override; + void SetSeqnoTimeTableProperties( + const std::string& encoded_seqno_to_time_mapping, + uint64_t oldest_ancestor_time) override; + private: bool ok() const { return status().ok(); } diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 13ecf87143..49ccf1bf2f 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -154,6 +154,10 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) { if (!props.compression_options.empty()) { Add(TablePropertiesNames::kCompressionOptions, props.compression_options); } + if (!props.seqno_to_time_mapping.empty()) { + Add(TablePropertiesNames::kSequenceNumberTimeMapping, + props.seqno_to_time_mapping); + } } Slice PropertyBlockBuilder::Finish() { @@ -369,6 +373,8 @@ Status ReadTablePropertiesHelper( new_table_properties->compression_name = raw_val.ToString(); } else if (key == TablePropertiesNames::kCompressionOptions) { new_table_properties->compression_options = raw_val.ToString(); + } else if (key == TablePropertiesNames::kSequenceNumberTimeMapping) { + new_table_properties->seqno_to_time_mapping = raw_val.ToString(); } else { // handle user-collected properties new_table_properties->user_collected_properties.insert( diff --git a/table/plain/plain_table_builder.cc b/table/plain/plain_table_builder.cc index 2c93516011..f3f3497177 100644 --- a/table/plain/plain_table_builder.cc +++ b/table/plain/plain_table_builder.cc @@ -330,6 +330,11 @@ const char* PlainTableBuilder::GetFileChecksumFuncName() const { return kUnknownFileChecksumFuncName; } } +void PlainTableBuilder::SetSeqnoTimeTableProperties(const std::string& string, + uint64_t uint_64) { + // TODO: storing seqno to time mapping is not yet support for plain table. + TableBuilder::SetSeqnoTimeTableProperties(string, uint_64); +} } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/table/plain/plain_table_builder.h b/table/plain/plain_table_builder.h index 562e6392c4..0903b96e87 100644 --- a/table/plain/plain_table_builder.h +++ b/table/plain/plain_table_builder.h @@ -94,6 +94,9 @@ class PlainTableBuilder: public TableBuilder { // Get file checksum function name const char* GetFileChecksumFuncName() const override; + void SetSeqnoTimeTableProperties(const std::string& string, + uint64_t uint_64) override; + private: Arena arena_; const ImmutableOptions& ioptions_; diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 2e45ac11e1..67cfd4d304 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -279,14 +279,16 @@ Status SstFileWriter::Open(const std::string& file_path) { r->column_family_name = ""; cf_id = TablePropertiesCollectorFactory::Context::kUnknownColumnFamily; } + + // TODO: it would be better to set oldest_key_time to be used for getting the + // approximate time of ingested keys. TableBuilderOptions table_builder_options( r->ioptions, r->mutable_cf_options, r->internal_comparator, &int_tbl_prop_collector_factories, compression_type, compression_opts, cf_id, r->column_family_name, unknown_level, false /* is_bottommost */, - TableFileCreationReason::kMisc, 0 /* creation_time */, - 0 /* oldest_key_time */, 0 /* file_creation_time */, - "SST Writer" /* db_id */, r->db_session_id, 0 /* target_file_size */, - r->next_file_number); + TableFileCreationReason::kMisc, 0 /* oldest_key_time */, + 0 /* file_creation_time */, "SST Writer" /* db_id */, r->db_session_id, + 0 /* target_file_size */, r->next_file_number); // External SST files used to each get a unique session id. Now for // slightly better uniqueness probability in constructing cache keys, we // assign fake file numbers to each file (into table properties) and keep diff --git a/table/table_builder.h b/table/table_builder.h index 6060c6ab59..c1d9b8c156 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -16,6 +16,7 @@ #include #include "db/dbformat.h" +#include "db/seqno_to_time_mapping.h" #include "db/table_properties_collector.h" #include "file/writable_file_writer.h" #include "options/cf_options.h" @@ -109,7 +110,7 @@ struct TableBuilderOptions { const std::string& _column_family_name, int _level, bool _is_bottommost = false, TableFileCreationReason _reason = TableFileCreationReason::kMisc, - const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0, + const int64_t _oldest_key_time = 0, const uint64_t _file_creation_time = 0, const std::string& _db_id = "", const std::string& _db_session_id = "", const uint64_t _target_file_size = 0, const uint64_t _cur_file_num = 0) @@ -121,7 +122,6 @@ struct TableBuilderOptions { compression_opts(_compression_opts), column_family_id(_column_family_id), column_family_name(_column_family_name), - creation_time(_creation_time), oldest_key_time(_oldest_key_time), target_file_size(_target_file_size), file_creation_time(_file_creation_time), @@ -140,7 +140,6 @@ struct TableBuilderOptions { const CompressionOptions& compression_opts; const uint32_t column_family_id; const std::string& column_family_name; - const uint64_t creation_time; const int64_t oldest_key_time; const uint64_t target_file_size; const uint64_t file_creation_time; @@ -222,6 +221,11 @@ class TableBuilder { // Return file checksum function name virtual const char* GetFileChecksumFuncName() const = 0; + + // Set the sequence number to time mapping + virtual void SetSeqnoTimeTableProperties( + const std::string& /*encoded_seqno_to_time_mapping*/, + uint64_t /*oldest_ancestor_time*/){}; }; } // namespace ROCKSDB_NAMESPACE diff --git a/table/table_properties.cc b/table/table_properties.cc index e15da6a66d..a88686651b 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -5,6 +5,7 @@ #include "rocksdb/table_properties.h" +#include "db/seqno_to_time_mapping.h" #include "port/malloc.h" #include "port/port.h" #include "rocksdb/env.h" @@ -164,6 +165,12 @@ std::string TableProperties::ToString( s.ok() ? UniqueIdToHumanString(id) : "N/A", prop_delim, kv_delim); + SeqnoToTimeMapping seq_time_mapping; + s = seq_time_mapping.Add(seqno_to_time_mapping); + AppendProperty(result, "Sequence number to time mapping", + s.ok() ? seq_time_mapping.ToHumanString() : "N/A", prop_delim, + kv_delim); + return result; } @@ -307,6 +314,8 @@ const std::string TablePropertiesNames::kSlowCompressionEstimatedDataSize = "rocksdb.sample_for_compression.slow.data.size"; const std::string TablePropertiesNames::kFastCompressionEstimatedDataSize = "rocksdb.sample_for_compression.fast.data.size"; +const std::string TablePropertiesNames::kSequenceNumberTimeMapping = + "rocksdb.seqno.time.map"; #ifndef NDEBUG // WARNING: TEST_SetRandomTableProperties assumes the following layout of diff --git a/test_util/mock_time_env.h b/test_util/mock_time_env.h index 873b352d5c..7834368e03 100644 --- a/test_util/mock_time_env.h +++ b/test_util/mock_time_env.h @@ -60,8 +60,9 @@ class MockSystemClock : public SystemClockWrapper { void MockSleepForSeconds(int seconds) { assert(seconds >= 0); - int micros = seconds * kMicrosInSecond; - SleepForMicroseconds(micros); + uint64_t micros = static_cast(seconds) * kMicrosInSecond; + assert(current_time_us_ + micros >= current_time_us_); + current_time_us_.fetch_add(micros); } // TODO: this is a workaround for the different behavior on different platform diff --git a/util/timer.h b/util/timer.h index 6571dc7de0..4b9ab668ec 100644 --- a/util/timer.h +++ b/util/timer.h @@ -186,6 +186,12 @@ class Timer { } return ret; } + + bool TEST_HasVaildTask(const std::string& func_name) const { + InstrumentedMutexLock l(&mutex_); + auto it = map_.find(func_name); + return it != map_.end() && it->second->IsValid(); + } #endif // NDEBUG private: