diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index eeaa691237..c894023b96 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -646,6 +646,63 @@ TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { } } +TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = GetParam(); + options.write_buffer_size = (static_cast(64) << 20); + CreateAndReopenWithCF({"pikachu"}, options); + + const size_t num_cfs = handles_.size(); + ASSERT_EQ(num_cfs, 2); + WriteOptions wopts; + for (size_t i = 0; i != num_cfs; ++i) { + ASSERT_OK(Put(static_cast(i) /*cf*/, "key", "value", wopts)); + } + + { + // Flush the default CF only. + std::vector cf_ids{0}; + ASSERT_OK(Flush(cf_ids)); + + autovector flushed_cfds; + autovector> flush_edits; + auto flushed_cfh = static_cast(handles_[0]); + flushed_cfds.push_back(flushed_cfh->cfd()); + flush_edits.push_back({}); + auto unflushed_cfh = static_cast(handles_[1]); + + ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->TEST_GetVersionSet(), + flushed_cfds, flush_edits), + unflushed_cfh->cfd()->GetLogNumber()); + } + + { + // Flush all CFs. + std::vector cf_ids; + for (size_t i = 0; i != num_cfs; ++i) { + cf_ids.emplace_back(static_cast(i)); + } + ASSERT_OK(Flush(cf_ids)); + uint64_t log_num_after_flush = dbfull()->TEST_GetCurrentLogNumber(); + + uint64_t min_log_number_to_keep = port::kMaxUint64; + autovector flushed_cfds; + autovector> flush_edits; + for (size_t i = 0; i != num_cfs; ++i) { + auto cfh = static_cast(handles_[i]); + flushed_cfds.push_back(cfh->cfd()); + flush_edits.push_back({}); + min_log_number_to_keep = + std::min(min_log_number_to_keep, cfh->cfd()->GetLogNumber()); + } + ASSERT_EQ(min_log_number_to_keep, log_num_after_flush); + ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->TEST_GetVersionSet(), + flushed_cfds, flush_edits), + min_log_number_to_keep); + } +} + TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 2f48c3c266..66aa9be959 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1017,6 +1017,12 @@ class DBImpl : public DB { VersionSet* TEST_GetVersionSet() const { return versions_.get(); } + uint64_t TEST_GetCurrentLogNumber() const { + InstrumentedMutexLock l(mutex()); + assert(!logs_.empty()); + return logs_.back().number; + } + const std::unordered_set& TEST_GetFilesGrabbedForPurge() const { return files_grabbed_for_purge_; } @@ -2225,6 +2231,10 @@ extern uint64_t PrecomputeMinLogNumberToKeep2PC( extern uint64_t PrecomputeMinLogNumberToKeepNon2PC( VersionSet* vset, const ColumnFamilyData& cfd_to_flush, const autovector& edit_list); +// For atomic flush. +extern uint64_t PrecomputeMinLogNumberToKeepNon2PC( + VersionSet* vset, const autovector& cfds_to_flush, + const autovector>& edit_lists); // `cfd_to_flush` is the column family whose memtable will be flushed and thus // will not depend on any WAL file. nullptr means no memtable is being flushed. diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index f67e8f748a..35eb0e182e 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -6,16 +6,17 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "db/db_impl/db_impl.h" - #include #include #include + +#include "db/db_impl/db_impl.h" #include "db/event_helpers.h" #include "db/memtable_list.h" #include "file/file_util.h" #include "file/filename.h" #include "file/sst_file_manager_impl.h" +#include "port/port.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -710,6 +711,42 @@ uint64_t PrecomputeMinLogNumberToKeepNon2PC( return min_log_number_to_keep; } +uint64_t PrecomputeMinLogNumberToKeepNon2PC( + VersionSet* vset, const autovector& cfds_to_flush, + const autovector>& edit_lists) { + assert(vset != nullptr); + assert(!cfds_to_flush.empty()); + assert(cfds_to_flush.size() == edit_lists.size()); + + uint64_t min_log_number_to_keep = port::kMaxUint64; + for (const auto& edit_list : edit_lists) { + uint64_t log = 0; + for (const auto& e : edit_list) { + if (e->HasLogNumber()) { + log = std::max(log, e->GetLogNumber()); + } + } + if (log != 0) { + min_log_number_to_keep = std::min(min_log_number_to_keep, log); + } + } + if (min_log_number_to_keep == port::kMaxUint64) { + min_log_number_to_keep = cfds_to_flush[0]->GetLogNumber(); + for (size_t i = 1; i < cfds_to_flush.size(); i++) { + min_log_number_to_keep = + std::min(min_log_number_to_keep, cfds_to_flush[i]->GetLogNumber()); + } + } + + std::unordered_set flushed_cfds( + cfds_to_flush.begin(), cfds_to_flush.end()); + min_log_number_to_keep = + std::min(min_log_number_to_keep, + vset->PreComputeMinLogNumberWithUnflushedData(flushed_cfds)); + + return min_log_number_to_keep; +} + uint64_t PrecomputeMinLogNumberToKeep2PC( VersionSet* vset, const ColumnFamilyData& cfd_to_flush, const autovector& edit_list, diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 77c2ba51e3..f2974ec04e 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -757,12 +757,7 @@ Status InstallMemtableAtomicFlushResults( std::unique_ptr wal_deletion; if (vset->db_options()->track_and_verify_wals_in_manifest) { uint64_t min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[0], edit_lists[0]); - for (size_t i = 1; i < cfds.size(); i++) { - min_wal_number_to_keep = std::min( - min_wal_number_to_keep, - PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[i], edit_lists[i])); - } + PrecomputeMinLogNumberToKeepNon2PC(vset, cfds, edit_lists); const auto& wals = vset->GetWalSet().GetWals(); if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) { wal_deletion.reset(new VersionEdit); diff --git a/db/version_set.h b/db/version_set.h index 09e54f6af5..86eeea614b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include @@ -1136,6 +1137,23 @@ class VersionSet { } return min_log_num; } + // Returns the minimum log number which still has data not flushed to any SST + // file, except data from `cfds_to_skip`. + uint64_t PreComputeMinLogNumberWithUnflushedData( + const std::unordered_set& cfds_to_skip) const { + uint64_t min_log_num = port::kMaxUint64; + for (auto cfd : *column_family_set_) { + if (cfds_to_skip.count(cfd)) { + continue; + } + // It's safe to ignore dropped column families here: + // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST. + if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { + min_log_num = cfd->GetLogNumber(); + } + } + return min_log_num; + } // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed.