diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index 0bf3436390..1370a54604 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -80,7 +80,7 @@ jobs: steps: - uses: actions/checkout@v4.1.0 - uses: "./.github/actions/pre-steps" - - run: PORTABLE=1 make V=1 -j32 valgrind_test + - run: make V=1 -j32 valgrind_test - uses: "./.github/actions/post-steps" build-windows-vs2022-avx2: if: ${{ github.repository_owner == 'facebook' }} diff --git a/Makefile b/Makefile index db1e7c2e2d..ad3add80f5 100644 --- a/Makefile +++ b/Makefile @@ -630,6 +630,11 @@ VALGRIND_VER := $(join $(VALGRIND_VER),valgrind) VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full # Not yet supported: --show-leak-kinds=definite,possible,reachable --errors-for-leak-kinds=definite,possible,reachable +# Work around valgrind hanging on systems with limited internet access +ifneq ($(shell which git 2>/dev/null && git config --get https.proxy),) + export DEBUGINFOD_URLS= +endif + TEST_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(TEST_LIB_SOURCES) $(MOCK_LIB_SOURCES)) $(GTEST) BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(BENCH_LIB_SOURCES)) CACHE_BENCH_OBJECTS = $(patsubst %.cc, $(OBJ_DIR)/%.o, $(CACHE_BENCH_LIB_SOURCES)) @@ -1164,16 +1169,16 @@ ubsan_crash_test_with_best_efforts_recovery: clean $(MAKE) clean full_valgrind_test: - ROCKSDB_FULL_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check + ROCKSDB_FULL_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 PORTABLE=1 $(MAKE) valgrind_check full_valgrind_test_some: - ROCKSDB_FULL_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check_some + ROCKSDB_FULL_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 PORTABLE=1 $(MAKE) valgrind_check_some valgrind_test: - ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check + ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 PORTABLE=1 $(MAKE) valgrind_check valgrind_test_some: - ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 $(MAKE) valgrind_check_some + ROCKSDB_VALGRIND_RUN=1 DISABLE_JEMALLOC=1 PORTABLE=1 $(MAKE) valgrind_check_some valgrind_check: $(TESTS) $(MAKE) DRIVER="$(VALGRIND_VER) $(VALGRIND_OPTS)" gen_parallel_tests diff --git a/TARGETS b/TARGETS index fb4588ef76..4b79a73e42 100644 --- a/TARGETS +++ b/TARGETS @@ -417,6 +417,8 @@ cpp_binary_wrapper(name="ldb", srcs=["tools/ldb.cc"], deps=[":rocksdb_tools_lib" cpp_binary_wrapper(name="db_stress", srcs=["db_stress_tool/db_stress.cc"], deps=[":rocksdb_stress_lib"], extra_preprocessor_flags=[], extra_bench_libs=False) +cpp_binary_wrapper(name="db_bench", srcs=["tools/db_bench.cc"], deps=[":rocksdb_tools_lib"], extra_preprocessor_flags=[], extra_bench_libs=False) + cpp_binary_wrapper(name="cache_bench", srcs=["cache/cache_bench.cc"], deps=[":rocksdb_cache_bench_tools_lib"], extra_preprocessor_flags=[], extra_bench_libs=False) cpp_binary_wrapper(name="ribbon_bench", srcs=["microbench/ribbon_bench.cc"], deps=[], extra_preprocessor_flags=[], extra_bench_libs=True) diff --git a/buckifier/buckify_rocksdb.py b/buckifier/buckify_rocksdb.py index f7831c6907..815e55d7bf 100755 --- a/buckifier/buckify_rocksdb.py +++ b/buckifier/buckify_rocksdb.py @@ -201,6 +201,10 @@ def generate_targets(repo_path, deps_map): TARGETS.add_binary( "db_stress", ["db_stress_tool/db_stress.cc"], [":rocksdb_stress_lib"] ) + # db_bench binary + TARGETS.add_binary( + "db_bench", ["tools/db_bench.cc"], [":rocksdb_tools_lib"] + ) # cache_bench binary TARGETS.add_binary( "cache_bench", ["cache/cache_bench.cc"], [":rocksdb_cache_bench_tools_lib"] diff --git a/cache/cache_test.cc b/cache/cache_test.cc index 462c2ec741..12bcfe6cd4 100644 --- a/cache/cache_test.cc +++ b/cache/cache_test.cc @@ -886,6 +886,32 @@ TEST_P(CacheTest, ApplyToAllEntriesDuringResize) { ASSERT_EQ(special_count, kSpecialCount); } +TEST_P(CacheTest, ApplyToHandleTest) { + std::string callback_state; + const auto callback = [&](const Slice& key, Cache::ObjectPtr value, + size_t charge, + const Cache::CacheItemHelper* helper) { + callback_state = std::to_string(DecodeKey(key)) + "," + + std::to_string(DecodeValue(value)) + "," + + std::to_string(charge); + assert(helper == &CacheTest::kHelper); + }; + + std::vector inserted; + + for (int i = 0; i < 10; ++i) { + Insert(i, i * 2, i + 1); + inserted.push_back(std::to_string(i) + "," + std::to_string(i * 2) + "," + + std::to_string(i + 1)); + } + for (int i = 0; i < 10; ++i) { + Cache::Handle* handle = cache_->Lookup(EncodeKey(i)); + cache_->ApplyToHandle(cache_.get(), handle, callback); + EXPECT_EQ(inserted[i], callback_state); + cache_->Release(handle); + } +} + TEST_P(CacheTest, DefaultShardBits) { // Prevent excessive allocation (to save time & space) estimated_value_size_ = 100000; diff --git a/cache/clock_cache.cc b/cache/clock_cache.cc index 078b922dd3..090213cb0d 100644 --- a/cache/clock_cache.cc +++ b/cache/clock_cache.cc @@ -1444,6 +1444,22 @@ const Cache::CacheItemHelper* BaseHyperClockCache::GetCacheItemHelper( return h->helper; } +template +void BaseHyperClockCache
::ApplyToHandle( + Cache* cache, Handle* handle, + const std::function& + callback) { + BaseHyperClockCache
* cache_ptr = + static_cast*>(cache); + auto h = static_cast(handle); + UniqueId64x2 unhashed; + auto hash_seed = cache_ptr->GetShard(h->GetHash()).GetTable().GetHashSeed(); + callback( + ClockCacheShard
::ReverseHash(h->hashed_key, &unhashed, hash_seed), + h->value, h->GetTotalCharge(), h->helper); +} + namespace { // For each cache shard, estimate what the table load factor would be if diff --git a/cache/clock_cache.h b/cache/clock_cache.h index 7423fa1f41..2d5d0d9eef 100644 --- a/cache/clock_cache.h +++ b/cache/clock_cache.h @@ -1128,6 +1128,12 @@ class BaseHyperClockCache : public ShardedCache> { const CacheItemHelper* GetCacheItemHelper(Handle* handle) const override; + void ApplyToHandle( + Cache* cache, Handle* handle, + const std::function& + callback) override; + void ReportProblems( const std::shared_ptr& /*info_log*/) const override; }; diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index 79c46bcc5c..230a6726ca 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -677,6 +677,17 @@ const Cache::CacheItemHelper* LRUCache::GetCacheItemHelper( return h->helper; } +void LRUCache::ApplyToHandle( + Cache* cache, Handle* handle, + const std::function& callback) { + auto cache_ptr = static_cast(cache); + auto h = static_cast(handle); + callback(h->key(), h->value, + h->GetCharge(cache_ptr->GetShard(0).metadata_charge_policy_), + h->helper); +} + size_t LRUCache::TEST_GetLRUSize() { return SumOverShards([](LRUCacheShard& cs) { return cs.TEST_GetLRUSize(); }); } diff --git a/cache/lru_cache.h b/cache/lru_cache.h index 045480fbcf..7fb2a88a00 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -452,6 +452,12 @@ class LRUCache size_t GetCharge(Handle* handle) const override; const CacheItemHelper* GetCacheItemHelper(Handle* handle) const override; + void ApplyToHandle( + Cache* cache, Handle* handle, + const std::function& callback) + override; + // Retrieves number of elements in LRU, for unit test purpose only. size_t TEST_GetLRUSize(); // Retrieves high pri pool ratio. diff --git a/cache/tiered_secondary_cache_test.cc b/cache/tiered_secondary_cache_test.cc index b32033133e..e4b4202265 100644 --- a/cache/tiered_secondary_cache_test.cc +++ b/cache/tiered_secondary_cache_test.cc @@ -253,6 +253,7 @@ TEST_F(DBTieredSecondaryCacheTest, BasicTest) { table_options.cache_index_and_filter_blocks = false; Options options = GetDefaultOptions(); options.create_if_missing = true; + options.compression = kLZ4Compression; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); // Disable paranoid_file_checks so that flush will not read back the newly @@ -364,6 +365,7 @@ TEST_F(DBTieredSecondaryCacheTest, BasicMultiGetTest) { table_options.cache_index_and_filter_blocks = false; Options options = GetDefaultOptions(); options.create_if_missing = true; + options.compression = kLZ4Compression; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.paranoid_file_checks = false; @@ -506,6 +508,7 @@ TEST_F(DBTieredSecondaryCacheTest, WaitAllTest) { table_options.cache_index_and_filter_blocks = false; Options options = GetDefaultOptions(); options.create_if_missing = true; + options.compression = kLZ4Compression; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.paranoid_file_checks = false; @@ -606,6 +609,7 @@ TEST_F(DBTieredSecondaryCacheTest, ReadyBeforeWaitAllTest) { table_options.cache_index_and_filter_blocks = false; Options options = GetDefaultOptions(); options.create_if_missing = true; + options.compression = kLZ4Compression; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.statistics = CreateDBStatistics(); @@ -717,6 +721,7 @@ TEST_F(DBTieredSecondaryCacheTest, IterateTest) { table_options.cache_index_and_filter_blocks = false; Options options = GetDefaultOptions(); options.create_if_missing = true; + options.compression = kLZ4Compression; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.paranoid_file_checks = false; @@ -784,6 +789,7 @@ TEST_P(DBTieredAdmPolicyTest, CompressedOnlyTest) { table_options.cache_index_and_filter_blocks = false; Options options = GetDefaultOptions(); options.create_if_missing = true; + options.compression = kLZ4Compression; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); size_t comp_cache_usage = compressed_secondary_cache()->TEST_GetUsage(); @@ -836,6 +842,7 @@ TEST_P(DBTieredAdmPolicyTest, CompressedCacheAdmission) { table_options.cache_index_and_filter_blocks = false; Options options = GetDefaultOptions(); options.create_if_missing = true; + options.compression = kLZ4Compression; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); size_t comp_cache_usage = compressed_secondary_cache()->TEST_GetUsage(); @@ -937,6 +944,7 @@ TEST_F(DBTieredSecondaryCacheTest, FSBufferTest) { table_options.cache_index_and_filter_blocks = false; Options options = GetDefaultOptions(); options.create_if_missing = true; + options.compression = kLZ4Compression; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.statistics = CreateDBStatistics(); options.env = wrap_env.get(); diff --git a/db/column_family.cc b/db/column_family.cc index 06e2b4365d..2b611fda7c 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1565,28 +1565,6 @@ Status ColumnFamilyData::SetOptions( return s; } -// REQUIRES: DB mutex held -Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) { - if (initial_cf_options_.compaction_style != kCompactionStyleLevel) { - return Env::WLTH_NOT_SET; - } - if (level == 0) { - return Env::WLTH_MEDIUM; - } - int base_level = current_->storage_info()->base_level(); - - // L1: medium, L2: long, ... - if (level - base_level >= 2) { - return Env::WLTH_EXTREME; - } else if (level < base_level) { - // There is no restriction which prevents level passed in to be smaller - // than base_level. - return Env::WLTH_MEDIUM; - } - return static_cast( - level - base_level + static_cast(Env::WLTH_MEDIUM)); -} - Status ColumnFamilyData::AddDirectories( std::map>* created_dirs) { Status s; diff --git a/db/column_family.h b/db/column_family.h index 18fc41e177..e4b7adde89 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -511,8 +511,6 @@ class ColumnFamilyData { return initial_cf_options_; } - Env::WriteLifeTimeHint CalculateSSTWriteHint(int level); - // created_dirs remembers directory created, so that we don't need to call // the same data creation operation again. Status AddDirectories( diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 1c014663cb..fc76f93f1c 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -686,12 +686,11 @@ bool Compaction::KeyRangeNotExistsBeyondOutputLevel( }; // Mark (or clear) each file that is being compacted -void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) { +void Compaction::MarkFilesBeingCompacted(bool being_compacted) const { for (size_t i = 0; i < num_input_levels(); i++) { for (size_t j = 0; j < inputs_[i].size(); j++) { - assert(mark_as_compacted ? !inputs_[i][j]->being_compacted - : inputs_[i][j]->being_compacted); - inputs_[i][j]->being_compacted = mark_as_compacted; + assert(being_compacted != inputs_[i][j]->being_compacted); + inputs_[i][j]->being_compacted = being_compacted; } } } @@ -735,7 +734,7 @@ uint64_t Compaction::CalculateTotalInputSize() const { return size; } -void Compaction::ReleaseCompactionFiles(Status status) { +void Compaction::ReleaseCompactionFiles(const Status& status) { MarkFilesBeingCompacted(false); cfd_->compaction_picker()->ReleaseCompactionFiles(this, status); } diff --git a/db/compaction/compaction.h b/db/compaction/compaction.h index 22157eb2c3..633e68a9ea 100644 --- a/db/compaction/compaction.h +++ b/db/compaction/compaction.h @@ -230,7 +230,7 @@ class Compaction { // Delete this compaction from the list of running compactions. // // Requirement: DB mutex held - void ReleaseCompactionFiles(Status status); + void ReleaseCompactionFiles(const Status& status); // Returns the summary of the compaction in "output" with maximum "len" // in bytes. The caller is responsible for the memory management of @@ -435,13 +435,13 @@ class Compaction { const int start_level, const int output_level); + // mark (or clear) all files that are being compacted + void MarkFilesBeingCompacted(bool being_compacted) const; + private: Status InitInputTableProperties(); - // mark (or clear) all files that are being compacted - void MarkFilesBeingCompacted(bool mark_as_compacted); - // get the smallest and largest key present in files to be compacted static void GetBoundaryKeys(VersionStorageInfo* vstorage, const std::vector& inputs, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 445ca8afc0..b4b4eeacee 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -251,12 +251,13 @@ void CompactionJob::Prepare() { // Generate file_levels_ for compaction before making Iterator auto* c = compact_->compaction; - ColumnFamilyData* cfd = c->column_family_data(); + [[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data(); assert(cfd != nullptr); - assert(cfd->current()->storage_info()->NumLevelFiles( - compact_->compaction->level()) > 0); + const VersionStorageInfo* storage_info = c->input_version()->storage_info(); + assert(storage_info); + assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0); - write_hint_ = cfd->CalculateSSTWriteHint(c->output_level()); + write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level()); bottommost_level_ = c->bottommost_level(); if (c->ShouldFormSubcompactions()) { @@ -297,8 +298,8 @@ void CompactionJob::Prepare() { for (const auto& each_level : *c->inputs()) { for (const auto& fmd : each_level.files) { std::shared_ptr tp; - Status s = - cfd->current()->GetTableProperties(read_options, &tp, fmd, nullptr); + Status s = c->input_version()->GetTableProperties(read_options, &tp, + fmd, nullptr); if (s.ok()) { s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping); } diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index e049d95b24..cc47060b5f 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -133,7 +133,8 @@ CompactionPicker::CompactionPicker(const ImmutableOptions& ioptions, CompactionPicker::~CompactionPicker() = default; // Delete this compaction from the list of running compactions. -void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { +void CompactionPicker::ReleaseCompactionFiles(Compaction* c, + const Status& status) { UnregisterCompaction(c); if (!status.ok()) { c->ResetNextCompactionIndex(); diff --git a/db/compaction/compaction_picker.h b/db/compaction/compaction_picker.h index 88915d4594..087595a8a6 100644 --- a/db/compaction/compaction_picker.h +++ b/db/compaction/compaction_picker.h @@ -104,7 +104,7 @@ class CompactionPicker { // Free up the files that participated in a compaction // // Requirement: DB mutex held - void ReleaseCompactionFiles(Compaction* c, Status status); + void ReleaseCompactionFiles(Compaction* c, const Status& status); // Returns true if any one of the specified files are being compacted bool AreFilesInCompaction(const std::vector& files); diff --git a/db/compaction/compaction_service_job.cc b/db/compaction/compaction_service_job.cc index 3b56d057b4..a923e4fcc4 100644 --- a/db/compaction/compaction_service_job.cc +++ b/db/compaction/compaction_service_job.cc @@ -261,11 +261,11 @@ Status CompactionServiceCompactionJob::Run() { auto* c = compact_->compaction; assert(c->column_family_data() != nullptr); - assert(c->column_family_data()->current()->storage_info()->NumLevelFiles( - compact_->compaction->level()) > 0); + const VersionStorageInfo* storage_info = c->input_version()->storage_info(); + assert(storage_info); + assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0); - write_hint_ = - c->column_family_data()->CalculateSSTWriteHint(c->output_level()); + write_hint_ = storage_info->CalculateSSTWriteHint(c->output_level()); bottommost_level_ = c->bottommost_level(); Slice begin = compaction_input_.begin; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 83b39a5218..ed918f0b9e 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -6146,7 +6146,7 @@ TEST_F(DBCompactionTest, CompactionLimiter) { std::vector pending_compaction_cfs; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "SchedulePendingCompaction::cfd", [&](void* arg) { + "EnqueuePendingCompaction::cfd", [&](void* arg) { const std::string& cf_name = static_cast(arg)->GetName(); pending_compaction_cfs.emplace_back(cf_name); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 392cbac41b..3527278924 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -473,7 +473,7 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) { if (s.ok()) { for (auto cfd : *versions_->GetColumnFamilySet()) { - SchedulePendingCompaction(cfd); + EnqueuePendingCompaction(cfd); } MaybeScheduleFlushOrCompaction(); } @@ -4282,7 +4282,7 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { ->storage_info() ->BottommostFilesMarkedForCompaction() .empty()) { - SchedulePendingCompaction(cfd); + EnqueuePendingCompaction(cfd); MaybeScheduleFlushOrCompaction(); cf_scheduled.push_back(cfd); } @@ -5837,6 +5837,10 @@ Status DBImpl::IngestExternalFiles( "allow_db_generated_files."); } } + if (ingest_opts.move_files && ingest_opts.link_files) { + return Status::InvalidArgument( + "`move_files` and `link_files` can not both be true."); + } } // TODO (yanqin) maybe handle the case in which column_families have diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e3eb3253e6..1b3bafaae0 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2216,10 +2216,27 @@ class DBImpl : public DB { void GenerateFlushRequest(const autovector& cfds, FlushReason flush_reason, FlushRequest* req); - // Returns true if `req` is successfully enqueued. - bool SchedulePendingFlush(const FlushRequest& req); + // Below functions are for executing flush, compaction in the background. A + // dequeue is the communication channel between threads that asks for the work + // to be done and the available threads in the thread pool that pick it up to + // execute it. We use these terminologies to describe the state of the work + // and its transitions: + // 1) It becomes pending once it's successfully enqueued into the + // corresponding dequeue, a work in this state is also called unscheduled. + // Counter `unscheduled_*_` counts work in this state. + // 2) When `MaybeScheduleFlushOrCompaction` schedule a thread to run `BGWork*` + // for the work, it becomes scheduled + // Counter `bg_*_scheduled_` counts work in this state. + // 3) Once the thread start to execute `BGWork*`, the work is popped from the + // dequeue, it is now in running state + // Counter `num_running_*_` counts work in this state. + // 4) Eventually, the work is finished. We don't need to specifically track + // finished work. - void SchedulePendingCompaction(ColumnFamilyData* cfd); + // Returns true if `req` is successfully enqueued. + bool EnqueuePendingFlush(const FlushRequest& req); + + void EnqueuePendingCompaction(ColumnFamilyData* cfd); void SchedulePendingPurge(std::string fname, std::string dir_to_sync, FileType type, uint64_t number, int job_id); static void BGWorkCompaction(void* arg); @@ -2946,6 +2963,14 @@ DBOptions SanitizeOptions(const std::string& db, const DBOptions& src, CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options); +// Return a VersionEdit for the DB's recovery when the `memtables` of the +// specified column family are obsolete. Specifically, the min log number to +// keep, and the WAL files that can be deleted. +VersionEdit GetDBRecoveryEditForObsoletingMemTables( + VersionSet* vset, const ColumnFamilyData& cfd, + const autovector& edit_list, + const autovector& memtables, LogsWithPrepTracker* prep_tracker); + // Return the earliest log file to keep after the memtable flush is // finalized. // `cfd_to_flush` is the column family whose memtable (specified in diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index f8fe021311..3fb8af4477 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -1880,7 +1880,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { Status status = versions_->LogAndApply(cfd, mutable_cf_options, read_options, write_options, &edit, &mutex_, directories_.GetDbDir()); - + c->MarkFilesBeingCompacted(false); cfd->compaction_picker()->UnregisterCompaction(c.get()); c.reset(); @@ -2377,7 +2377,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, ColumnFamilyData* loop_cfd = req.cfd_to_max_mem_id_to_persist.begin()->first; bool already_queued_for_flush = loop_cfd->queued_for_flush(); - bool flush_req_enqueued = SchedulePendingFlush(req); + bool flush_req_enqueued = EnqueuePendingFlush(req); if (already_queued_for_flush || flush_req_enqueued) { loop_cfd->SetFlushSkipReschedule(); } @@ -2528,7 +2528,7 @@ Status DBImpl::AtomicFlushMemTables( } } GenerateFlushRequest(cfds, flush_reason, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); MaybeScheduleFlushOrCompaction(); } @@ -2583,7 +2583,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason, if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, flush_reason, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) { flush_memtable_ids.push_back(iter.second); } @@ -2597,7 +2597,7 @@ Status DBImpl::RetryFlushesForErrorRecovery(FlushReason flush_reason, flush_reason, {{cfd, std::numeric_limits::max() /* max_mem_id_to_persist */}}}; - if (SchedulePendingFlush(flush_req)) { + if (EnqueuePendingFlush(flush_req)) { cfd->SetFlushSkipReschedule(); }; } @@ -2950,6 +2950,7 @@ void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) { cfd->Ref(); compaction_queue_.push_back(cfd); cfd->set_queued_for_compaction(true); + ++unscheduled_compactions_; } ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() { @@ -3005,7 +3006,7 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue( return cfd; } -bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) { +bool DBImpl::EnqueuePendingFlush(const FlushRequest& flush_req) { mutex_.AssertHeld(); bool enqueued = false; if (reject_new_background_jobs_) { @@ -3041,16 +3042,15 @@ bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) { return enqueued; } -void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { +void DBImpl::EnqueuePendingCompaction(ColumnFamilyData* cfd) { mutex_.AssertHeld(); if (reject_new_background_jobs_) { return; } if (!cfd->queued_for_compaction() && cfd->NeedsCompaction()) { - TEST_SYNC_POINT_CALLBACK("SchedulePendingCompaction::cfd", + TEST_SYNC_POINT_CALLBACK("EnqueuePendingCompaction::cfd", static_cast(cfd)); AddToCompactionQueue(cfd); - ++unscheduled_compactions_; } } @@ -3218,7 +3218,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, #ifndef NDEBUG flush_req.reschedule_count += 1; #endif /* !NDEBUG */ - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); *reason = flush_reason; *flush_rescheduled_to_retain_udt = true; return Status::TryAgain(); @@ -3678,7 +3678,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ->ComputeCompactionScore(*(c->immutable_options()), *(c->mutable_cf_options())); AddToCompactionQueue(cfd); - ++unscheduled_compactions_; c.reset(); // Don't need to sleep here, because BackgroundCallCompaction @@ -3707,7 +3706,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, if (cfd->NeedsCompaction()) { // Yes, we need more compactions! AddToCompactionQueue(cfd); - ++unscheduled_compactions_; MaybeScheduleFlushOrCompaction(); } } @@ -3997,7 +3995,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, *(c->mutable_cf_options())); if (!cfd->queued_for_compaction()) { AddToCompactionQueue(cfd); - ++unscheduled_compactions_; } } } @@ -4269,7 +4266,7 @@ void DBImpl::InstallSuperVersionAndScheduleWork( // Whenever we install new SuperVersion, we might need to issue new flushes or // compactions. - SchedulePendingCompaction(cfd); + EnqueuePendingCompaction(cfd); MaybeScheduleFlushOrCompaction(); // Update max_total_in_memory_state_ diff --git a/db/db_impl/db_impl_experimental.cc b/db/db_impl/db_impl_experimental.cc index 113a7f42ff..f802fb9568 100644 --- a/db/db_impl/db_impl_experimental.cc +++ b/db/db_impl/db_impl_experimental.cc @@ -47,7 +47,7 @@ Status DBImpl::SuggestCompactRange(ColumnFamilyHandle* column_family, // compaction score vstorage->ComputeCompactionScore(*cfd->ioptions(), *cfd->GetLatestMutableCFOptions()); - SchedulePendingCompaction(cfd); + EnqueuePendingCompaction(cfd); MaybeScheduleFlushOrCompaction(); } return Status::OK(); diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 0db7293682..bb0ff3985a 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -722,6 +722,38 @@ void DBImpl::DeleteObsoleteFiles() { mutex_.Lock(); } +VersionEdit GetDBRecoveryEditForObsoletingMemTables( + VersionSet* vset, const ColumnFamilyData& cfd, + const autovector& edit_list, + const autovector& memtables, LogsWithPrepTracker* prep_tracker) { + VersionEdit wal_deletion_edit; + uint64_t min_wal_number_to_keep = 0; + assert(edit_list.size() > 0); + if (vset->db_options()->allow_2pc) { + // Note that if mempurge is successful, the edit_list will + // not be applicable (contains info of new min_log number to keep, + // and level 0 file path of SST file created during normal flush, + // so both pieces of information are irrelevant after a successful + // mempurge operation). + min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( + vset, cfd, edit_list, memtables, prep_tracker); + + // We piggyback the information of earliest log file to keep in the + // manifest entry for the last file flushed. + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, cfd, edit_list); + } + + wal_deletion_edit.SetMinLogNumberToKeep(min_wal_number_to_keep); + if (vset->db_options()->track_and_verify_wals_in_manifest) { + if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { + wal_deletion_edit.DeleteWalsBefore(min_wal_number_to_keep); + } + } + return wal_deletion_edit; +} + uint64_t FindMinPrepLogReferencedByMemTable( VersionSet* vset, const autovector& memtables_to_flush) { uint64_t min_log = 0; diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index a58a142d71..0697cc20f1 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1681,7 +1681,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, meta.oldest_ancester_time = current_time; meta.epoch_number = cfd->NewEpochNumber(); { - auto write_hint = cfd->CalculateSSTWriteHint(0); + auto write_hint = + cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0); mutex_.Unlock(); SequenceNumber earliest_write_conflict_snapshot; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index d6899502ae..da773bac0a 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1789,13 +1789,13 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) { if (!immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest({cfd}, FlushReason::kWalFull, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, FlushReason::kWalFull, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } MaybeScheduleFlushOrCompaction(); } @@ -1881,13 +1881,13 @@ Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { FlushRequest flush_req; GenerateFlushRequest({cfd}, FlushReason::kWriteBufferManager, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } } if (immutable_db_options_.atomic_flush) { FlushRequest flush_req; GenerateFlushRequest(cfds, FlushReason::kWriteBufferManager, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } MaybeScheduleFlushOrCompaction(); } @@ -2163,12 +2163,12 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { AssignAtomicFlushSeq(cfds); FlushRequest flush_req; GenerateFlushRequest(cfds, FlushReason::kWriteBufferFull, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } else { for (auto* cfd : cfds) { FlushRequest flush_req; GenerateFlushRequest({cfd}, FlushReason::kWriteBufferFull, &flush_req); - SchedulePendingFlush(flush_req); + EnqueuePendingFlush(flush_req); } } MaybeScheduleFlushOrCompaction(); diff --git a/db/db_io_failure_test.cc b/db/db_io_failure_test.cc index 9826ab6680..b72c259987 100644 --- a/db/db_io_failure_test.cc +++ b/db/db_io_failure_test.cc @@ -895,6 +895,81 @@ TEST_P(DBIOCorruptionTest, ManifestCorruptionRetry) { SyncPoint::GetInstance()->DisableProcessing(); } +TEST_P(DBIOCorruptionTest, FooterReadCorruptionRetry) { + Random rnd(300); + bool retry = false; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "ReadFooterFromFileInternal:0", [&](void* arg) { + Slice* data = static_cast(arg); + if (!retry) { + std::memcpy(const_cast(data->data()), + rnd.RandomString(static_cast(data->size())).c_str(), + data->size()); + retry = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("key1", "val1")); + Status s = Flush(); + if (std::get<2>(GetParam())) { + ASSERT_OK(s); + ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1); + ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT), + 1); + + std::string val; + ReadOptions ro; + ro.async_io = std::get<1>(GetParam()); + ASSERT_OK(dbfull()->Get(ro, "key1", &val)); + ASSERT_EQ(val, "val1"); + } else { + ASSERT_NOK(s); + ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0); + ASSERT_GT(stats()->getTickerCount(SST_FOOTER_CORRUPTION_COUNT), 0); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_P(DBIOCorruptionTest, TablePropertiesCorruptionRetry) { + Random rnd(300); + bool retry = false; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "ReadTablePropertiesHelper:0", [&](void* arg) { + Slice* data = static_cast(arg); + if (!retry) { + std::memcpy(const_cast(data->data()), + rnd.RandomString(static_cast(data->size())).c_str(), + data->size()); + retry = true; + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("key1", "val1")); + Status s = Flush(); + if (std::get<2>(GetParam())) { + ASSERT_OK(s); + ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 1); + ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT), + 1); + + std::string val; + ReadOptions ro; + ro.async_io = std::get<1>(GetParam()); + ASSERT_OK(dbfull()->Get(ro, "key1", &val)); + ASSERT_EQ(val, "val1"); + } else { + ASSERT_NOK(s); + ASSERT_EQ(stats()->getTickerCount(FILE_READ_CORRUPTION_RETRY_COUNT), 0); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + // The parameters are - 1. Use FS provided buffer, 2. Use async IO ReadOption, // 3. Retry with verify_and_reconstruct_read IOOption INSTANTIATE_TEST_CASE_P(DBIOCorruptionTest, DBIOCorruptionTest, diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 750c9641a5..7e5a975625 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -113,7 +113,7 @@ Status ExternalSstFileIngestionJob::Prepare( const std::string path_outside_db = f.external_file_path; const std::string path_inside_db = TableFileName( cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId()); - if (ingestion_options_.move_files) { + if (ingestion_options_.move_files || ingestion_options_.link_files) { status = fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr); if (status.ok()) { @@ -626,8 +626,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { DeleteInternalFiles(); consumed_seqno_count_ = 0; files_overlap_ = false; - } else if (status.ok() && ingestion_options_.move_files && - !ingestion_options_.allow_db_generated_files) { + } else if (status.ok() && ingestion_options_.move_files) { // The files were moved and added successfully, remove original file links for (IngestedFileInfo& f : files_to_ingest_) { Status s = fs_->DeleteFile(f.external_file_path, io_opts, nullptr); diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index 3a15c8ef10..17793c4931 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -66,7 +66,7 @@ class ExternalSSTFileTestBase : public DBTestBase { class ExternSSTFileLinkFailFallbackTest : public ExternalSSTFileTestBase, - public ::testing::WithParamInterface> { + public ::testing::WithParamInterface> { public: ExternSSTFileLinkFailFallbackTest() { fs_ = std::make_shared(env_->GetFileSystem(), true); @@ -2210,7 +2210,8 @@ TEST_P(ExternSSTFileLinkFailFallbackTest, LinkFailFallBackExternalSst) { DestroyAndReopen(options_); const int kNumKeys = 10000; IngestExternalFileOptions ifo; - ifo.move_files = true; + ifo.move_files = std::get<2>(GetParam()); + ifo.link_files = !ifo.move_files; ifo.failed_move_fall_back_to_copy = failed_move_fall_back_to_copy; std::string file_path = sst_files_dir_ + "file1.sst"; @@ -2251,6 +2252,13 @@ TEST_P(ExternSSTFileLinkFailFallbackTest, LinkFailFallBackExternalSst) { ASSERT_EQ(0, bytes_copied); ASSERT_EQ(file_size, bytes_moved); ASSERT_FALSE(copyfile); + + Status es = env_->FileExists(file_path); + if (ifo.move_files) { + ASSERT_TRUE(es.IsNotFound()); + } else { + ASSERT_OK(es); + } } else { // Link operation fails. ASSERT_EQ(0, bytes_moved); @@ -2269,6 +2277,11 @@ TEST_P(ExternSSTFileLinkFailFallbackTest, LinkFailFallBackExternalSst) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest, + ExternSSTFileLinkFailFallbackTest, + testing::Combine(testing::Bool(), testing::Bool(), + testing::Bool())); + class TestIngestExternalFileListener : public EventListener { public: void OnExternalFileIngested(DB* /*db*/, @@ -3719,19 +3732,13 @@ TEST_F(ExternalSSTFileWithTimestampTest, TimestampsNotPersistedBasic) { INSTANTIATE_TEST_CASE_P(ExternalSSTFileTest, ExternalSSTFileTest, testing::Combine(testing::Bool(), testing::Bool())); -INSTANTIATE_TEST_CASE_P(ExternSSTFileLinkFailFallbackTest, - ExternSSTFileLinkFailFallbackTest, - testing::Values(std::make_tuple(true, false), - std::make_tuple(true, true), - std::make_tuple(false, false))); - class IngestDBGeneratedFileTest : public ExternalSSTFileTestBase, public ::testing::WithParamInterface> { public: IngestDBGeneratedFileTest() { ingest_opts.allow_db_generated_files = true; - ingest_opts.move_files = std::get<0>(GetParam()); + ingest_opts.link_files = std::get<0>(GetParam()); ingest_opts.verify_checksums_before_ingest = std::get<1>(GetParam()); ingest_opts.snapshot_consistency = false; } @@ -3744,10 +3751,10 @@ INSTANTIATE_TEST_CASE_P(BasicMultiConfig, IngestDBGeneratedFileTest, testing::Combine(testing::Bool(), testing::Bool())); TEST_P(IngestDBGeneratedFileTest, FailureCase) { - if (encrypted_env_ && ingest_opts.move_files) { + if (encrypted_env_ && ingest_opts.link_files) { // FIXME: should fail ingestion or support this combination. ROCKSDB_GTEST_SKIP( - "Encrypted env and move_files do not work together, as we reopen the " + "Encrypted env and link_files do not work together, as we reopen the " "file after linking it which appends an extra encryption prefix."); return; } @@ -3943,7 +3950,7 @@ TEST_P(IngestDBGeneratedFileTest2, NotOverlapWithDB) { ingest_opts.allow_global_seqno = std::get<1>(GetParam()); ingest_opts.allow_blocking_flush = std::get<2>(GetParam()); ingest_opts.fail_if_not_bottommost_level = std::get<3>(GetParam()); - ingest_opts.move_files = std::get<4>(GetParam()); + ingest_opts.link_files = std::get<4>(GetParam()); do { SCOPED_TRACE("option_config_ = " + std::to_string(option_config_)); diff --git a/db/flush_job.cc b/db/flush_job.cc index 44fe86c786..6bd71dd562 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -630,7 +630,7 @@ Status FlushJob::MemPurge() { new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber()); // This addition will not trigger another flush, because - // we do not call SchedulePendingFlush(). + // we do not call EnqueuePendingFlush(). cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free); new_mem->Ref(); // Piggyback FlushJobInfo on the first flushed memtable. @@ -861,7 +861,7 @@ Status FlushJob::WriteLevel0Table() { std::vector blob_file_additions; { - auto write_hint = cfd_->CalculateSSTWriteHint(0); + auto write_hint = base_->storage_info()->CalculateSSTWriteHint(/*level=*/0); Env::IOPriority io_priority = GetRateLimiterPriority(); db_mutex_->Unlock(); if (log_buffer_) { diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 3675a280b9..c3612656e2 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -582,37 +582,28 @@ Status MemTableList::TryInstallMemtableFlushResults( // TODO(myabandeh): Not sure how batch_count could be 0 here. if (batch_count > 0) { - uint64_t min_wal_number_to_keep = 0; - assert(edit_list.size() > 0); - if (vset->db_options()->allow_2pc) { - // Note that if mempurge is successful, the edit_list will - // not be applicable (contains info of new min_log number to keep, - // and level 0 file path of SST file created during normal flush, - // so both pieces of information are irrelevant after a successful - // mempurge operation). - min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( - vset, *cfd, edit_list, memtables_to_flush, prep_tracker); - - // We piggyback the information of earliest log file to keep in the - // manifest entry for the last file flushed. + VersionEdit edit; +#ifdef ROCKSDB_ASSERT_STATUS_CHECKED + if (memtables_to_flush.size() == memlist.size()) { + // TODO(yuzhangyu): remove this testing code once the + // `GetEditForDroppingCurrentVersion` API is used by the atomic data + // replacement. This function can get the same edits for wal related + // fields, and some duplicated fields as contained already in edit_list + // for column family's recovery. + edit = GetEditForDroppingCurrentVersion(cfd, vset, prep_tracker); } else { - min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); + edit = GetDBRecoveryEditForObsoletingMemTables( + vset, *cfd, edit_list, memtables_to_flush, prep_tracker); } - - VersionEdit wal_deletion; - wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep); - if (vset->db_options()->track_and_verify_wals_in_manifest) { - if (min_wal_number_to_keep > - vset->GetWalSet().GetMinWalNumberToKeep()) { - wal_deletion.DeleteWalsBefore(min_wal_number_to_keep); - } - TEST_SYNC_POINT_CALLBACK( - "MemTableList::TryInstallMemtableFlushResults:" - "AfterComputeMinWalToKeep", - nullptr); - } - edit_list.push_back(&wal_deletion); +#else + edit = GetDBRecoveryEditForObsoletingMemTables( + vset, *cfd, edit_list, memtables_to_flush, prep_tracker); +#endif // ROCKSDB_ASSERT_STATUS_CHECKED + TEST_SYNC_POINT_CALLBACK( + "MemTableList::TryInstallMemtableFlushResults:" + "AfterComputeMinWalToKeep", + nullptr); + edit_list.push_back(&edit); const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, to_delete, mu](const Status& status) { @@ -1026,4 +1017,38 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number, ResetTrimHistoryNeeded(); } +VersionEdit MemTableList::GetEditForDroppingCurrentVersion( + const ColumnFamilyData* cfd, VersionSet* vset, + LogsWithPrepTracker* prep_tracker) const { + assert(cfd); + auto& memlist = current_->memlist_; + if (memlist.empty()) { + return VersionEdit(); + } + + uint64_t max_next_log_number = 0; + autovector edit_list; + autovector memtables_to_drop; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + memtables_to_drop.push_back(m); + max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number); + } + + // Check the obsoleted MemTables' impact on WALs related to DB's recovery (min + // log number to keep, a delta of WAL files to delete). + VersionEdit edit_with_log_number; + edit_with_log_number.SetPrevLogNumber(0); + edit_with_log_number.SetLogNumber(max_next_log_number); + edit_list.push_back(&edit_with_log_number); + VersionEdit edit = GetDBRecoveryEditForObsoletingMemTables( + vset, *cfd, edit_list, memtables_to_drop, prep_tracker); + + // Set fields related to the column family's recovery. + edit.SetColumnFamily(cfd->GetID()); + edit.SetPrevLogNumber(0); + edit.SetLogNumber(max_next_log_number); + return edit; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable_list.h b/db/memtable_list.h index 218701e0b3..dd439de559 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -447,6 +447,12 @@ class MemTableList { void RemoveOldMemTables(uint64_t log_number, autovector* to_delete); + // This API is only used by atomic date replacement. To get an edit for + // dropping the current `MemTableListVersion`. + VersionEdit GetEditForDroppingCurrentVersion( + const ColumnFamilyData* cfd, VersionSet* vset, + LogsWithPrepTracker* prep_tracker) const; + private: friend Status InstallMemtableAtomicFlushResults( const autovector* imm_lists, diff --git a/db/repair.cc b/db/repair.cc index c3c96fefc2..114d36a6a8 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -451,7 +451,8 @@ class Repairer { meta.file_creation_time = current_time; SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance(); - auto write_hint = cfd->CalculateSSTWriteHint(0); + auto write_hint = + cfd->current()->storage_info()->CalculateSSTWriteHint(/*level=*/0); std::vector> range_del_iters; auto range_del_iter = mem->NewRangeTombstoneIterator( diff --git a/db/version_set.cc b/db/version_set.cc index e81165a3d2..c2fa7aad79 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -44,8 +44,8 @@ #include "table/compaction_merging_iterator.h" #if USE_COROUTINES -#include "folly/experimental/coro/BlockingWait.h" -#include "folly/experimental/coro/Collect.h" +#include "folly/coro/BlockingWait.h" +#include "folly/coro/Collect.h" #endif #include "file/filename.h" #include "file/random_access_file_reader.h" @@ -4919,6 +4919,27 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun( return false; } +Env::WriteLifeTimeHint VersionStorageInfo::CalculateSSTWriteHint( + int level) const { + if (compaction_style_ != kCompactionStyleLevel) { + return Env::WLTH_NOT_SET; + } + if (level == 0) { + return Env::WLTH_MEDIUM; + } + + // L1: medium, L2: long, ... + if (level - base_level_ >= 2) { + return Env::WLTH_EXTREME; + } else if (level < base_level_) { + // There is no restriction which prevents level passed in to be smaller + // than base_level. + return Env::WLTH_MEDIUM; + } + return static_cast( + level - base_level_ + static_cast(Env::WLTH_MEDIUM)); +} + void Version::AddLiveFiles(std::vector* live_table_files, std::vector* live_blob_files) const { assert(live_table_files); diff --git a/db/version_set.h b/db/version_set.h index 9e80b3a4c0..9336782b1e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -49,8 +49,8 @@ #include "db/write_controller.h" #include "env/file_system_tracer.h" #if USE_COROUTINES -#include "folly/experimental/coro/BlockingWait.h" -#include "folly/experimental/coro/Collect.h" +#include "folly/coro/BlockingWait.h" +#include "folly/coro/Collect.h" #endif #include "monitoring/instrumented_mutex.h" #include "options/db_options.h" @@ -626,6 +626,8 @@ class VersionStorageInfo { const Slice& largest_user_key, int last_level, int last_l0_idx); + Env::WriteLifeTimeHint CalculateSSTWriteHint(int level) const; + private: void ComputeCompensatedSizes(); void UpdateNumNonEmptyLevels(); diff --git a/file/sst_file_manager_impl.cc b/file/sst_file_manager_impl.cc index 68c74424a2..3f4f48e284 100644 --- a/file/sst_file_manager_impl.cc +++ b/file/sst_file_manager_impl.cc @@ -99,6 +99,7 @@ void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) { size_added_by_compaction += filemeta->fd.GetFileSize(); } } + assert(cur_compactions_reserved_size_ >= size_added_by_compaction); cur_compactions_reserved_size_ -= size_added_by_compaction; } @@ -450,7 +451,6 @@ void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path, // File was added before, we will just update the size total_files_size_ -= tracked_file->second; total_files_size_ += file_size; - cur_compactions_reserved_size_ -= file_size; } else { total_files_size_ += file_size; } diff --git a/include/rocksdb/advanced_cache.h b/include/rocksdb/advanced_cache.h index e2aefdd011..ab9f722a05 100644 --- a/include/rocksdb/advanced_cache.h +++ b/include/rocksdb/advanced_cache.h @@ -411,6 +411,14 @@ class Cache { const CacheItemHelper* helper)>& callback, const ApplyToAllEntriesOptions& opts) = 0; + // Apply a callback to a cache handle. The Cache must ensure the lifetime + // of the key passed to the callback is valid for the duration of the + // callback. The handle may not belong to the cache, but is guaranteed to + // be type compatible. + virtual void ApplyToHandle( + Cache* cache, Handle* handle, + const std::function& callback) = 0; // Remove all entries. // Prerequisite: no entry is referenced. virtual void EraseUnRefEntries() = 0; @@ -636,6 +644,15 @@ class CacheWrapper : public Cache { target_->ApplyToAllEntries(callback, opts); } + virtual void ApplyToHandle( + Cache* cache, Handle* handle, + const std::function& callback) + override { + auto cache_ptr = static_cast(cache); + target_->ApplyToHandle(cache_ptr->target_.get(), handle, callback); + } + void EraseUnRefEntries() override { target_->EraseUnRefEntries(); } void StartAsyncLookup(AsyncLookupHandle& async_handle) override { diff --git a/include/rocksdb/convenience.h b/include/rocksdb/convenience.h index cff03f2bc1..d426dcfad8 100644 --- a/include/rocksdb/convenience.h +++ b/include/rocksdb/convenience.h @@ -56,7 +56,9 @@ struct ConfigOptions { // setting }; - // When true, any unused options will be ignored and OK will be returned + // When true, any unused options will be ignored and OK will be returned. + // For options files that appear to be from the current version or earlier, + // unknown options are considered corruption regardless of this setting. bool ignore_unknown_options = false; // When true, any unsupported options will be ignored and OK will be returned diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e3eb0368d0..fb272663b4 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -2131,10 +2131,16 @@ struct CompactRangeOptions { // IngestExternalFileOptions is used by IngestExternalFile() struct IngestExternalFileOptions { // Can be set to true to move the files instead of copying them. - // Note that original file links will be removed after successful ingestion, - // unless `allow_db_generated_files` is true. + // The input files will be unlinked after successful ingestion. + // The implementation depends on hard links (LinkFile) instead of traditional + // move (RenameFile) to maximize the chances to restore to the original + // state upon failure. bool move_files = false; - // If set to true, ingestion falls back to copy when move fails. + // Same as move_files except that input files will NOT be unlinked. + // Only one of `move_files` and `link_files` can be set at the same time. + bool link_files = false; + // If set to true, ingestion falls back to copy when hard linking fails. + // This applies to both `move_files` and `link_files`. bool failed_move_fall_back_to_copy = true; // If set to false, an ingested file keys could appear in existing snapshots // that where created before the file was ingested. @@ -2209,8 +2215,6 @@ struct IngestExternalFileOptions { // Enables ingestion of files not generated by SstFileWriter. When true: // - Allows files to be ingested when their cf_id doesn't match the CF they // are being ingested into. - // - Preserves original file links after successful ingestion when - // `move_files = true`. // REQUIREMENTS: // - Ingested files must not overlap with existing keys. // - `write_global_seqno` must be false. diff --git a/options/options_parser.cc b/options/options_parser.cc index 4e249908be..408e6d7ab9 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -296,12 +296,13 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in, return s; } - // If the option file is not generated by a higher minor version, - // there shouldn't be any unknown option. + // If the option file is not generated by a higher version, unknown + // option should only mean corruption. if (config_options.ignore_unknown_options && section == kOptionSectionVersion) { - if (db_version[0] < ROCKSDB_MAJOR || (db_version[0] == ROCKSDB_MAJOR && - db_version[1] <= ROCKSDB_MINOR)) { + using VTuple = std::tuple; + if (VTuple(db_version[0], db_version[1], db_version[2]) <= + VTuple(ROCKSDB_MAJOR, ROCKSDB_MINOR, ROCKSDB_PATCH)) { config_options.ignore_unknown_options = false; } } diff --git a/options/options_test.cc b/options/options_test.cc index 7fa89aec97..fa29704651 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -3449,44 +3449,8 @@ TEST_F(OptionsParserTest, DuplicateCFOptions) { } TEST_F(OptionsParserTest, IgnoreUnknownOptions) { - for (int case_id = 0; case_id < 5; case_id++) { - DBOptions db_opt; - db_opt.max_open_files = 12345; - db_opt.max_background_flushes = 301; - db_opt.max_total_wal_size = 1024; - ColumnFamilyOptions cf_opt; - - std::string version_string; - bool should_ignore = true; - if (case_id == 0) { - // same version - should_ignore = false; - version_string = std::to_string(ROCKSDB_MAJOR) + "." + - std::to_string(ROCKSDB_MINOR) + ".0"; - } else if (case_id == 1) { - // higher minor version - should_ignore = true; - version_string = std::to_string(ROCKSDB_MAJOR) + "." + - std::to_string(ROCKSDB_MINOR + 1) + ".0"; - } else if (case_id == 2) { - // higher major version. - should_ignore = true; - version_string = std::to_string(ROCKSDB_MAJOR + 1) + ".0.0"; - } else if (case_id == 3) { - // lower minor version -#if ROCKSDB_MINOR == 0 - continue; -#else - version_string = std::to_string(ROCKSDB_MAJOR) + "." + - std::to_string(ROCKSDB_MINOR - 1) + ".0"; - should_ignore = false; -#endif - } else { - // lower major version - should_ignore = false; - version_string = std::to_string(ROCKSDB_MAJOR - 1) + "." + - std::to_string(ROCKSDB_MINOR) + ".0"; - } + auto testCase = [&](bool should_ignore, const std::string& version_string) { + SCOPED_TRACE(std::to_string(should_ignore) + ", " + version_string); std::string options_file_content = "# This is a testing option string.\n" @@ -3519,16 +3483,45 @@ TEST_F(OptionsParserTest, IgnoreUnknownOptions) { RocksDBOptionsParser parser; ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(), false, 4096 /* readahead_size */)); + Status parse_status = parser.Parse(kTestFileName, fs_.get(), + true /* ignore_unknown_options */, + 4096 /* readahead_size */); if (should_ignore) { - ASSERT_OK(parser.Parse(kTestFileName, fs_.get(), - true /* ignore_unknown_options */, - 4096 /* readahead_size */)); + ASSERT_OK(parse_status); } else { - ASSERT_NOK(parser.Parse(kTestFileName, fs_.get(), - true /* ignore_unknown_options */, - 4096 /* readahead_size */)); + ASSERT_NOK(parse_status); } - } + }; + + // Same version + testCase(false, GetRocksVersionAsString()); + // Same except .0 patch + testCase(false, std::to_string(ROCKSDB_MAJOR) + "." + + std::to_string(ROCKSDB_MINOR) + ".0"); + // Higher major version + testCase(true, std::to_string(ROCKSDB_MAJOR + 1) + "." + + std::to_string(ROCKSDB_MINOR) + ".0"); + // Higher minor version + testCase(true, std::to_string(ROCKSDB_MAJOR) + "." + + std::to_string(ROCKSDB_MINOR + 1) + ".0"); + // Higher patch version + testCase(true, std::to_string(ROCKSDB_MAJOR) + "." + + std::to_string(ROCKSDB_MINOR) + "." + + std::to_string(ROCKSDB_PATCH + 1)); + // Lower major version + testCase(false, std::to_string(ROCKSDB_MAJOR - 1) + "." + + std::to_string(ROCKSDB_MINOR) + ".0"); +#if ROCKSDB_MINOR > 0 + // Lower minor version + testCase(false, std::to_string(ROCKSDB_MAJOR) + "." + + std::to_string(ROCKSDB_MINOR - 1) + ".0"); +#endif +#if ROCKSDB_PATCH > 0 + // Lower patch version + testCase(false, std::to_string(ROCKSDB_MAJOR) + "." + + std::to_string(ROCKSDB_MINOR - 1) + "." + + std::to_string(ROCKSDB_PATCH - 1)); +#endif } TEST_F(OptionsParserTest, ParseVersion) { diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index f9bdfc9b07..fe45224d08 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -680,26 +680,12 @@ Status BlockBasedTable::Open( if (s.ok()) { s = ReadFooterFromFile(opts, file.get(), *ioptions.fs, prefetch_buffer.get(), file_size, &footer, - kBlockBasedTableMagicNumber); - } - // If the footer is corrupted and the FS supports checksum verification and - // correction, try reading the footer again - if (s.IsCorruption()) { - RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT); - if (CheckFSFeatureSupport(ioptions.fs.get(), - FSSupportedOps::kVerifyAndReconstructRead)) { - IOOptions retry_opts = opts; - retry_opts.verify_and_reconstruct_read = true; - s = ReadFooterFromFile(retry_opts, file.get(), *ioptions.fs, - prefetch_buffer.get(), file_size, &footer, - kBlockBasedTableMagicNumber); - RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_COUNT); - if (s.ok()) { - RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT); - } - } + kBlockBasedTableMagicNumber, ioptions.stats); } if (!s.ok()) { + if (s.IsCorruption()) { + RecordTick(ioptions.statistics.get(), SST_FOOTER_CORRUPTION_COUNT); + } return s; } if (!IsSupportedFormatVersion(footer.format_version())) { diff --git a/table/block_based/filter_policy.cc b/table/block_based/filter_policy.cc index 233ced7033..049148a8ad 100644 --- a/table/block_based/filter_policy.cc +++ b/table/block_based/filter_policy.cc @@ -91,9 +91,24 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder { uint64_t alt_hash = GetSliceHash64(alt); std::optional prev_key_hash; std::optional prev_alt_hash = hash_entries_info_.prev_alt_hash; + if (!hash_entries_info_.entries.empty()) { prev_key_hash = hash_entries_info_.entries.back(); } + +#ifdef ROCKSDB_VALGRIND_RUN + // Valgrind can report uninitialized FPs on std::optional usage. See e.g. + // https://stackoverflow.com/q/51616179 + if (!prev_key_hash.has_value()) { + std::memset((void*)&prev_key_hash, 0, sizeof(prev_key_hash)); + prev_key_hash.reset(); + } + if (!prev_alt_hash.has_value()) { + std::memset((void*)&prev_alt_hash, 0, sizeof(prev_key_hash)); + prev_alt_hash.reset(); + } +#endif + // Add alt first, so that entries.back() always contains previous key // ASSUMING a change from one alt to the next implies a change to // corresponding key @@ -295,15 +310,6 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder { bool detect_filter_construct_corruption_; struct HashEntriesInfo { -#ifdef ROCKSDB_VALGRIND_RUN - HashEntriesInfo() { - // Valgrind can report uninitialized FPs on std::optional usage. See e.g. - // https://stackoverflow.com/q/51616179 - std::memset((void*)&prev_alt_hash, 0, sizeof(prev_alt_hash)); - prev_alt_hash = {}; - } -#endif - // A deque avoids unnecessary copying of already-saved values // and has near-minimal peak memory use. std::deque entries; diff --git a/table/format.cc b/table/format.cc index e5ba3c6a6b..7e1c2817dd 100644 --- a/table/format.cc +++ b/table/format.cc @@ -475,10 +475,12 @@ std::string Footer::ToString() const { return result; } -Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, - FileSystem& fs, FilePrefetchBuffer* prefetch_buffer, - uint64_t file_size, Footer* footer, - uint64_t enforce_table_magic_number) { +static Status ReadFooterFromFileInternal(const IOOptions& opts, + RandomAccessFileReader* file, + FileSystem& fs, + FilePrefetchBuffer* prefetch_buffer, + uint64_t file_size, Footer* footer, + uint64_t enforce_table_magic_number) { if (file_size < Footer::kMinEncodedLength) { return Status::Corruption("file is too short (" + std::to_string(file_size) + @@ -516,6 +518,8 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, } } + TEST_SYNC_POINT_CALLBACK("ReadFooterFromFileInternal:0", &footer_input); + // Check that we actually read the whole footer from the file. It may be // that size isn't correct. if (footer_input.size() < Footer::kMinEncodedLength) { @@ -543,6 +547,30 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, return Status::OK(); } +Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, + FileSystem& fs, FilePrefetchBuffer* prefetch_buffer, + uint64_t file_size, Footer* footer, + uint64_t enforce_table_magic_number, + Statistics* stats) { + Status s = + ReadFooterFromFileInternal(opts, file, fs, prefetch_buffer, file_size, + footer, enforce_table_magic_number); + if (s.IsCorruption() && + CheckFSFeatureSupport(&fs, FSSupportedOps::kVerifyAndReconstructRead)) { + IOOptions new_opts = opts; + new_opts.verify_and_reconstruct_read = true; + footer->Reset(); + s = ReadFooterFromFileInternal(new_opts, file, fs, prefetch_buffer, + file_size, footer, + enforce_table_magic_number); + RecordTick(stats, FILE_READ_CORRUPTION_RETRY_COUNT); + if (s.ok()) { + RecordTick(stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT); + } + } + return s; +} + namespace { // Custom handling for the last byte of a block, to avoid invoking streaming // API to get an effective block checksum. This function is its own inverse diff --git a/table/format.h b/table/format.h index cbd6d08fa3..dac5d695be 100644 --- a/table/format.h +++ b/table/format.h @@ -186,6 +186,16 @@ class Footer { // Create empty. Populate using DecodeFrom. Footer() {} + void Reset() { + table_magic_number_ = kNullTableMagicNumber; + format_version_ = kInvalidFormatVersion; + base_context_checksum_ = 0; + metaindex_handle_ = BlockHandle::NullBlockHandle(); + index_handle_ = BlockHandle::NullBlockHandle(); + checksum_type_ = kInvalidChecksumType; + block_trailer_size_ = 0; + } + // Deserialize a footer (populate fields) from `input` and check for various // corruptions. `input_offset` is the offset within the target file of // `input` buffer, which is needed for verifying format_version >= 6 footer. @@ -304,7 +314,8 @@ class FooterBuilder { Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, FileSystem& fs, FilePrefetchBuffer* prefetch_buffer, uint64_t file_size, Footer* footer, - uint64_t enforce_table_magic_number = 0); + uint64_t enforce_table_magic_number = 0, + Statistics* stats = nullptr); // Computes a checksum using the given ChecksumType. Sometimes we need to // include one more input byte logically at the end but not part of the main diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 26467a2805..cc8f6bfce3 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -262,184 +262,232 @@ Status ReadTablePropertiesHelper( MemoryAllocator* memory_allocator) { assert(table_properties); - // If this is an external SST file ingested with write_global_seqno set to - // true, then we expect the checksum mismatch because checksum was written - // by SstFileWriter, but its global seqno in the properties block may have - // been changed during ingestion. For this reason, we initially read - // and process without checksum verification, then later try checksum - // verification so that if it fails, we can copy to a temporary buffer with - // global seqno set to its original value, i.e. 0, and attempt checksum - // verification again. - ReadOptions modified_ro = ro; - modified_ro.verify_checksums = false; - BlockContents block_contents; - BlockFetcher block_fetcher(file, prefetch_buffer, footer, modified_ro, handle, - &block_contents, ioptions, false /* decompress */, - false /*maybe_compressed*/, BlockType::kProperties, - UncompressionDict::GetEmptyDict(), - PersistentCacheOptions::kEmpty, memory_allocator); - Status s = block_fetcher.ReadBlockContents(); - if (!s.ok()) { - return s; - } - - // Unfortunately, Block::size() might not equal block_contents.data.size(), - // and Block hides block_contents - uint64_t block_size = block_contents.data.size(); - Block properties_block(std::move(block_contents)); - std::unique_ptr iter(properties_block.NewMetaIterator()); - - std::unique_ptr new_table_properties{new TableProperties}; - // All pre-defined properties of type uint64_t - std::unordered_map predefined_uint64_properties = { - {TablePropertiesNames::kOriginalFileNumber, - &new_table_properties->orig_file_number}, - {TablePropertiesNames::kDataSize, &new_table_properties->data_size}, - {TablePropertiesNames::kIndexSize, &new_table_properties->index_size}, - {TablePropertiesNames::kIndexPartitions, - &new_table_properties->index_partitions}, - {TablePropertiesNames::kTopLevelIndexSize, - &new_table_properties->top_level_index_size}, - {TablePropertiesNames::kIndexKeyIsUserKey, - &new_table_properties->index_key_is_user_key}, - {TablePropertiesNames::kIndexValueIsDeltaEncoded, - &new_table_properties->index_value_is_delta_encoded}, - {TablePropertiesNames::kFilterSize, &new_table_properties->filter_size}, - {TablePropertiesNames::kRawKeySize, &new_table_properties->raw_key_size}, - {TablePropertiesNames::kRawValueSize, - &new_table_properties->raw_value_size}, - {TablePropertiesNames::kNumDataBlocks, - &new_table_properties->num_data_blocks}, - {TablePropertiesNames::kNumEntries, &new_table_properties->num_entries}, - {TablePropertiesNames::kNumFilterEntries, - &new_table_properties->num_filter_entries}, - {TablePropertiesNames::kDeletedKeys, - &new_table_properties->num_deletions}, - {TablePropertiesNames::kMergeOperands, - &new_table_properties->num_merge_operands}, - {TablePropertiesNames::kNumRangeDeletions, - &new_table_properties->num_range_deletions}, - {TablePropertiesNames::kFormatVersion, - &new_table_properties->format_version}, - {TablePropertiesNames::kFixedKeyLen, - &new_table_properties->fixed_key_len}, - {TablePropertiesNames::kColumnFamilyId, - &new_table_properties->column_family_id}, - {TablePropertiesNames::kCreationTime, - &new_table_properties->creation_time}, - {TablePropertiesNames::kOldestKeyTime, - &new_table_properties->oldest_key_time}, - {TablePropertiesNames::kFileCreationTime, - &new_table_properties->file_creation_time}, - {TablePropertiesNames::kSlowCompressionEstimatedDataSize, - &new_table_properties->slow_compression_estimated_data_size}, - {TablePropertiesNames::kFastCompressionEstimatedDataSize, - &new_table_properties->fast_compression_estimated_data_size}, - {TablePropertiesNames::kTailStartOffset, - &new_table_properties->tail_start_offset}, - {TablePropertiesNames::kUserDefinedTimestampsPersisted, - &new_table_properties->user_defined_timestamps_persisted}, - {TablePropertiesNames::kKeyLargestSeqno, - &new_table_properties->key_largest_seqno}, - }; - - std::string last_key; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - s = iter->status(); - if (!s.ok()) { - break; + Status s; + bool retry = false; + while (true) { + BlockContents block_contents; + size_t len = handle.size() + footer.GetBlockTrailerSize(); + // If this is an external SST file ingested with write_global_seqno set to + // true, then we expect the checksum mismatch because checksum was written + // by SstFileWriter, but its global seqno in the properties block may have + // been changed during ingestion. For this reason, we initially read + // and process without checksum verification, then later try checksum + // verification so that if it fails, we can copy to a temporary buffer with + // global seqno set to its original value, i.e. 0, and attempt checksum + // verification again. + if (!retry) { + ReadOptions modified_ro = ro; + modified_ro.verify_checksums = false; + BlockFetcher block_fetcher( + file, prefetch_buffer, footer, modified_ro, handle, &block_contents, + ioptions, false /* decompress */, false /*maybe_compressed*/, + BlockType::kProperties, UncompressionDict::GetEmptyDict(), + PersistentCacheOptions::kEmpty, memory_allocator); + s = block_fetcher.ReadBlockContents(); + if (!s.ok()) { + return s; + } + assert(block_fetcher.GetBlockSizeWithTrailer() == len); + TEST_SYNC_POINT_CALLBACK("ReadTablePropertiesHelper:0", + &block_contents.data); + } else { + assert(s.IsCorruption()); + // If retrying, use a stronger file system read to check and correct + // data corruption + IOOptions opts; + if (PrepareIOFromReadOptions(ro, ioptions.clock, opts) != + IOStatus::OK()) { + return s; + } + opts.verify_and_reconstruct_read = true; + std::unique_ptr data(new char[len]); + Slice result; + IOStatus io_s = + file->Read(opts, handle.offset(), len, &result, data.get(), nullptr); + RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_COUNT); + if (!io_s.ok()) { + ROCKS_LOG_INFO(ioptions.info_log, + "Reading properties block failed - %s", + io_s.ToString().c_str()); + // Return the original corruption error as that's more serious + return s; + } + if (result.size() < len) { + return Status::Corruption("Reading properties block failed - " + + std::to_string(result.size()) + + " bytes read"); + } + RecordTick(ioptions.stats, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT); + block_contents = BlockContents(std::move(data), handle.size()); } - auto key = iter->key().ToString(); - // properties block should be strictly sorted with no duplicate key. - if (!last_key.empty() && - BytewiseComparator()->Compare(key, last_key) <= 0) { - s = Status::Corruption("properties unsorted"); - break; - } - last_key = key; + uint64_t block_size = block_contents.data.size(); + Block properties_block(std::move(block_contents)); + // Unfortunately, Block::size() might not equal block_contents.data.size(), + // and Block hides block_contents + std::unique_ptr iter(properties_block.NewMetaIterator()); - auto raw_val = iter->value(); - auto pos = predefined_uint64_properties.find(key); + std::unique_ptr new_table_properties{new TableProperties}; + // All pre-defined properties of type uint64_t + std::unordered_map predefined_uint64_properties = { + {TablePropertiesNames::kOriginalFileNumber, + &new_table_properties->orig_file_number}, + {TablePropertiesNames::kDataSize, &new_table_properties->data_size}, + {TablePropertiesNames::kIndexSize, &new_table_properties->index_size}, + {TablePropertiesNames::kIndexPartitions, + &new_table_properties->index_partitions}, + {TablePropertiesNames::kTopLevelIndexSize, + &new_table_properties->top_level_index_size}, + {TablePropertiesNames::kIndexKeyIsUserKey, + &new_table_properties->index_key_is_user_key}, + {TablePropertiesNames::kIndexValueIsDeltaEncoded, + &new_table_properties->index_value_is_delta_encoded}, + {TablePropertiesNames::kFilterSize, &new_table_properties->filter_size}, + {TablePropertiesNames::kRawKeySize, + &new_table_properties->raw_key_size}, + {TablePropertiesNames::kRawValueSize, + &new_table_properties->raw_value_size}, + {TablePropertiesNames::kNumDataBlocks, + &new_table_properties->num_data_blocks}, + {TablePropertiesNames::kNumEntries, &new_table_properties->num_entries}, + {TablePropertiesNames::kNumFilterEntries, + &new_table_properties->num_filter_entries}, + {TablePropertiesNames::kDeletedKeys, + &new_table_properties->num_deletions}, + {TablePropertiesNames::kMergeOperands, + &new_table_properties->num_merge_operands}, + {TablePropertiesNames::kNumRangeDeletions, + &new_table_properties->num_range_deletions}, + {TablePropertiesNames::kFormatVersion, + &new_table_properties->format_version}, + {TablePropertiesNames::kFixedKeyLen, + &new_table_properties->fixed_key_len}, + {TablePropertiesNames::kColumnFamilyId, + &new_table_properties->column_family_id}, + {TablePropertiesNames::kCreationTime, + &new_table_properties->creation_time}, + {TablePropertiesNames::kOldestKeyTime, + &new_table_properties->oldest_key_time}, + {TablePropertiesNames::kFileCreationTime, + &new_table_properties->file_creation_time}, + {TablePropertiesNames::kSlowCompressionEstimatedDataSize, + &new_table_properties->slow_compression_estimated_data_size}, + {TablePropertiesNames::kFastCompressionEstimatedDataSize, + &new_table_properties->fast_compression_estimated_data_size}, + {TablePropertiesNames::kTailStartOffset, + &new_table_properties->tail_start_offset}, + {TablePropertiesNames::kUserDefinedTimestampsPersisted, + &new_table_properties->user_defined_timestamps_persisted}, + {TablePropertiesNames::kKeyLargestSeqno, + &new_table_properties->key_largest_seqno}, + }; - if (key == ExternalSstFilePropertyNames::kGlobalSeqno) { - new_table_properties->external_sst_file_global_seqno_offset = - handle.offset() + iter->ValueOffset(); - } + std::string last_key; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + s = iter->status(); + if (!s.ok()) { + break; + } - if (pos != predefined_uint64_properties.end()) { - if (key == TablePropertiesNames::kDeletedKeys || - key == TablePropertiesNames::kMergeOperands) { - // Insert in user-collected properties for API backwards compatibility + auto key = iter->key().ToString(); + // properties block should be strictly sorted with no duplicate key. + if (!last_key.empty() && + BytewiseComparator()->Compare(key, last_key) <= 0) { + s = Status::Corruption("properties unsorted"); + break; + } + last_key = key; + + auto raw_val = iter->value(); + auto pos = predefined_uint64_properties.find(key); + + if (key == ExternalSstFilePropertyNames::kGlobalSeqno) { + new_table_properties->external_sst_file_global_seqno_offset = + handle.offset() + iter->ValueOffset(); + } + + if (pos != predefined_uint64_properties.end()) { + if (key == TablePropertiesNames::kDeletedKeys || + key == TablePropertiesNames::kMergeOperands) { + // Insert in user-collected properties for API backwards compatibility + new_table_properties->user_collected_properties.insert( + {key, raw_val.ToString()}); + } + // handle predefined rocksdb properties + uint64_t val; + if (!GetVarint64(&raw_val, &val)) { + // skip malformed value + auto error_msg = + "Detect malformed value in properties meta-block:" + "\tkey: " + + key + "\tval: " + raw_val.ToString(); + ROCKS_LOG_ERROR(ioptions.logger, "%s", error_msg.c_str()); + continue; + } + *(pos->second) = val; + } else if (key == TablePropertiesNames::kDbId) { + new_table_properties->db_id = raw_val.ToString(); + } else if (key == TablePropertiesNames::kDbSessionId) { + new_table_properties->db_session_id = raw_val.ToString(); + } else if (key == TablePropertiesNames::kDbHostId) { + new_table_properties->db_host_id = raw_val.ToString(); + } else if (key == TablePropertiesNames::kFilterPolicy) { + new_table_properties->filter_policy_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kColumnFamilyName) { + new_table_properties->column_family_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kComparator) { + new_table_properties->comparator_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kMergeOperator) { + new_table_properties->merge_operator_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kPrefixExtractorName) { + new_table_properties->prefix_extractor_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kPropertyCollectors) { + new_table_properties->property_collectors_names = raw_val.ToString(); + } else if (key == TablePropertiesNames::kCompression) { + new_table_properties->compression_name = raw_val.ToString(); + } else if (key == TablePropertiesNames::kCompressionOptions) { + new_table_properties->compression_options = raw_val.ToString(); + } else if (key == TablePropertiesNames::kSequenceNumberTimeMapping) { + new_table_properties->seqno_to_time_mapping = raw_val.ToString(); + } else { + // handle user-collected properties new_table_properties->user_collected_properties.insert( {key, raw_val.ToString()}); } - // handle predefined rocksdb properties - uint64_t val; - if (!GetVarint64(&raw_val, &val)) { - // skip malformed value - auto error_msg = - "Detect malformed value in properties meta-block:" - "\tkey: " + - key + "\tval: " + raw_val.ToString(); - ROCKS_LOG_ERROR(ioptions.logger, "%s", error_msg.c_str()); - continue; + } + + // Modified version of BlockFetcher checksum verification + // (See write_global_seqno comment above) + if (s.ok() && footer.GetBlockTrailerSize() > 0) { + s = VerifyBlockChecksum(footer, properties_block.data(), block_size, + file->file_name(), handle.offset()); + if (s.IsCorruption()) { + if (new_table_properties->external_sst_file_global_seqno_offset != 0) { + std::string tmp_buf(properties_block.data(), len); + uint64_t global_seqno_offset = + new_table_properties->external_sst_file_global_seqno_offset - + handle.offset(); + EncodeFixed64(&tmp_buf[static_cast(global_seqno_offset)], 0); + s = VerifyBlockChecksum(footer, tmp_buf.data(), block_size, + file->file_name(), handle.offset()); + } } - *(pos->second) = val; - } else if (key == TablePropertiesNames::kDbId) { - new_table_properties->db_id = raw_val.ToString(); - } else if (key == TablePropertiesNames::kDbSessionId) { - new_table_properties->db_session_id = raw_val.ToString(); - } else if (key == TablePropertiesNames::kDbHostId) { - new_table_properties->db_host_id = raw_val.ToString(); - } else if (key == TablePropertiesNames::kFilterPolicy) { - new_table_properties->filter_policy_name = raw_val.ToString(); - } else if (key == TablePropertiesNames::kColumnFamilyName) { - new_table_properties->column_family_name = raw_val.ToString(); - } else if (key == TablePropertiesNames::kComparator) { - new_table_properties->comparator_name = raw_val.ToString(); - } else if (key == TablePropertiesNames::kMergeOperator) { - new_table_properties->merge_operator_name = raw_val.ToString(); - } else if (key == TablePropertiesNames::kPrefixExtractorName) { - new_table_properties->prefix_extractor_name = raw_val.ToString(); - } else if (key == TablePropertiesNames::kPropertyCollectors) { - new_table_properties->property_collectors_names = raw_val.ToString(); - } else if (key == TablePropertiesNames::kCompression) { - new_table_properties->compression_name = raw_val.ToString(); - } else if (key == TablePropertiesNames::kCompressionOptions) { - new_table_properties->compression_options = raw_val.ToString(); - } else if (key == TablePropertiesNames::kSequenceNumberTimeMapping) { - new_table_properties->seqno_to_time_mapping = raw_val.ToString(); + } + + // If we detected a corruption and the file system supports verification + // and reconstruction, retry the read + if (s.IsCorruption() && !retry && + CheckFSFeatureSupport(ioptions.fs.get(), + FSSupportedOps::kVerifyAndReconstructRead)) { + retry = true; } else { - // handle user-collected properties - new_table_properties->user_collected_properties.insert( - {key, raw_val.ToString()}); - } - } - - // Modified version of BlockFetcher checksum verification - // (See write_global_seqno comment above) - if (s.ok() && footer.GetBlockTrailerSize() > 0) { - s = VerifyBlockChecksum(footer, properties_block.data(), block_size, - file->file_name(), handle.offset()); - if (s.IsCorruption()) { - if (new_table_properties->external_sst_file_global_seqno_offset != 0) { - std::string tmp_buf(properties_block.data(), - block_fetcher.GetBlockSizeWithTrailer()); - uint64_t global_seqno_offset = - new_table_properties->external_sst_file_global_seqno_offset - - handle.offset(); - EncodeFixed64(&tmp_buf[static_cast(global_seqno_offset)], 0); - s = VerifyBlockChecksum(footer, tmp_buf.data(), block_size, - file->file_name(), handle.offset()); + if (s.ok()) { + *table_properties = std::move(new_table_properties); } + break; } } - if (s.ok()) { - *table_properties = std::move(new_table_properties); - } - return s; } diff --git a/table/table_reader.h b/table/table_reader.h index 9faf8c1c3e..a9d46499bd 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -12,8 +12,8 @@ #include "db/range_tombstone_fragmenter.h" #if USE_COROUTINES -#include "folly/experimental/coro/Coroutine.h" -#include "folly/experimental/coro/Task.h" +#include "folly/coro/Coroutine.h" +#include "folly/coro/Task.h" #endif #include "rocksdb/slice_transform.h" #include "rocksdb/table_reader_caller.h" diff --git a/tools/check_format_compatible.sh b/tools/check_format_compatible.sh index 549abe39d6..98b8ca0946 100755 --- a/tools/check_format_compatible.sh +++ b/tools/check_format_compatible.sh @@ -125,7 +125,7 @@ EOF # To check for DB forward compatibility with loading options (old version # reading data from new), as well as backward compatibility -declare -a db_forward_with_options_refs=("8.6.fb" "8.7.fb" "8.8.fb" "8.9.fb" "8.10.fb" "8.11.fb" "9.0.fb" "9.1.fb" "9.2.fb" "9.3.fb" "9.4.fb" "9.5.fb", "9.6.fb") +declare -a db_forward_with_options_refs=("8.6.fb" "8.7.fb" "8.8.fb" "8.9.fb" "8.10.fb" "8.11.fb" "9.0.fb" "9.1.fb" "9.2.fb" "9.3.fb" "9.4.fb" "9.5.fb" "9.6.fb") # To check for DB forward compatibility without loading options (in addition # to the "with loading options" set), as well as backward compatibility declare -a db_forward_no_options_refs=() # N/A at the moment diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index c95851310d..038e5fd531 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -923,16 +923,22 @@ def finalize_and_sanitize(src_params): dest_params["prefixpercent"] = 0 dest_params["check_multiget_consistency"] = 0 dest_params["check_multiget_entity_consistency"] = 0 - if dest_params.get("disable_wal") == 0 and dest_params.get("reopen") > 0: - # Reopen with WAL currently requires persisting WAL data before closing for reopen. - # Previous injected WAL write errors may not be cleared by the time of closing and ready - # for persisting WAL. - # To simplify, we disable any WAL write error injection. - # TODO(hx235): support WAL write error injection with reopen - # TODO(hx235): support excluding WAL from metadata write fault injection so we don't - # have to disable metadata write fault injection to other file - dest_params["exclude_wal_from_write_fault_injection"] = 1 - dest_params["metadata_write_fault_one_in"] = 0 + if dest_params.get("disable_wal") == 0: + if dest_params.get("reopen") > 0 or (dest_params.get("manual_wal_flush_one_in") and dest_params.get("column_families") != 1): + # Reopen with WAL currently requires persisting WAL data before closing for reopen. + # Previous injected WAL write errors may not be cleared by the time of closing and ready + # for persisting WAL. + # To simplify, we disable any WAL write error injection. + # TODO(hx235): support WAL write error injection with reopen + # TODO(hx235): support excluding WAL from metadata write fault injection so we don't + # have to disable metadata write fault injection to other file + # + # WAL write failure can drop buffered WAL data. This can cause + # inconsistency when one CF has a successful flush during auto + # recovery. Disable the fault injection in this path for now until + # we have a fix that allows auto recovery. + dest_params["exclude_wal_from_write_fault_injection"] = 1 + dest_params["metadata_write_fault_one_in"] = 0 if dest_params.get("disable_wal") == 1: # disableWAL and recycle_log_file_num options are not mutually # compatible at the moment diff --git a/unreleased_history/behavior_changes/ingest-live-file-with-move.md b/unreleased_history/behavior_changes/ingest-live-file-with-move.md index 303c5754e2..444a7a45e7 100644 --- a/unreleased_history/behavior_changes/ingest-live-file-with-move.md +++ b/unreleased_history/behavior_changes/ingest-live-file-with-move.md @@ -1 +1 @@ -* Support ingesting db generated files using hard link, i.e. IngestExternalFileOptions::move_files and IngestExternalFileOptions::allow_db_generated_files._ \ No newline at end of file +* Support ingesting db generated files using hard link, i.e. IngestExternalFileOptions::move_files/link_files and IngestExternalFileOptions::allow_db_generated_files. \ No newline at end of file diff --git a/unreleased_history/behavior_changes/link-file-ingest.md b/unreleased_history/behavior_changes/link-file-ingest.md new file mode 100644 index 0000000000..e9f909eee7 --- /dev/null +++ b/unreleased_history/behavior_changes/link-file-ingest.md @@ -0,0 +1 @@ +* Add a new file ingestion option `IngestExternalFileOptions::link_files` to hard link input files and preserve original files links after ingestion. \ No newline at end of file diff --git a/unreleased_history/bug_fixes/bug-refit-level.md b/unreleased_history/bug_fixes/bug-refit-level.md new file mode 100644 index 0000000000..f41e699e6a --- /dev/null +++ b/unreleased_history/bug_fixes/bug-refit-level.md @@ -0,0 +1 @@ +* Fix a bug in CompactRange() where result files may not be compacted in any future compaction. This can only happen when users configure CompactRangeOptions::change_level to true and the change level step of manual compaction fails (#13009). \ No newline at end of file diff --git a/util/async_file_reader.h b/util/async_file_reader.h index df69a840eb..50a5951949 100644 --- a/util/async_file_reader.h +++ b/util/async_file_reader.h @@ -7,7 +7,7 @@ #if USE_COROUTINES #include "file/random_access_file_reader.h" -#include "folly/experimental/coro/ViaIfAsync.h" +#include "folly/coro/ViaIfAsync.h" #include "port/port.h" #include "rocksdb/file_system.h" #include "rocksdb/statistics.h" diff --git a/util/coro_utils.h b/util/coro_utils.h index 5b4211135e..d6d96baa61 100644 --- a/util/coro_utils.h +++ b/util/coro_utils.h @@ -5,8 +5,8 @@ // (found in the LICENSE.Apache file in the root directory). #if defined(USE_COROUTINES) -#include "folly/experimental/coro/Coroutine.h" -#include "folly/experimental/coro/Task.h" +#include "folly/coro/Coroutine.h" +#include "folly/coro/Task.h" #endif #include "rocksdb/rocksdb_namespace.h" diff --git a/utilities/fault_injection_fs.cc b/utilities/fault_injection_fs.cc index 6b39e2b387..d6e3f71119 100644 --- a/utilities/fault_injection_fs.cc +++ b/utilities/fault_injection_fs.cc @@ -506,10 +506,10 @@ IOStatus TestFSRandomAccessFile::ReadAsync( s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr); // TODO (low priority): fs_->ReadUnsyncedData() } else { - // If there’s no injected error, then cb will be called asynchronously when - // target_ actually finishes the read. But if there’s an injected error, it + // If there's no injected error, then cb will be called asynchronously when + // target_ actually finishes the read. But if there's an injected error, it // needs to immediately call cb(res, cb_arg) s since target_->ReadAsync() - // isn’t invoked at all. + // isn't invoked at all. res.status = res_status; cb(res, cb_arg); }