From 511b03a5b5bff61fe210a091002640f1a997d5b7 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 27 Jan 2014 13:55:47 -0800 Subject: [PATCH] LogAndApply to take ColumnFamilyData Summary: This removes the default implementation of LogAndApply that applied the changed to the default column family by default. It is mostly simple reformatting. Test Plan: make check Reviewers: dhruba, kailiu CC: leveldb Differential Revision: https://reviews.facebook.net/D15465 --- db/db_impl.cc | 26 +++++++++++++++----------- db/db_test.cc | 5 +++-- db/memtablelist.cc | 6 +++--- db/memtablelist.h | 14 +++++++------- db/version_set.cc | 3 ++- db/version_set.h | 7 ------- 6 files changed, 30 insertions(+), 31 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index de42887366..680a07ff05 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1009,7 +1009,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, // Since we already recovered log_number, we want all logs // with numbers `<= log_number` (includes this one) to be ignored edit.SetLogNumber(log_number + 1); - status = versions_->LogAndApply(&edit, &mutex_); + status = versions_->LogAndApply(default_cfd_, &edit, &mutex_); } return status; @@ -1204,8 +1204,9 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, // Replace immutable memtable with the generated Table s = default_cfd_->imm.InstallMemtableFlushResults( - mems, versions_.get(), s, &mutex_, options_.info_log.get(), file_number, - pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get()); + default_cfd_, mems, versions_.get(), s, &mutex_, options_.info_log.get(), + file_number, pending_outputs_, &deletion_state.memtables_to_free, + db_directory_.get()); if (s.ok()) { InstallSuperVersion(default_cfd_, deletion_state); @@ -1333,7 +1334,8 @@ Status DBImpl::ReFitLevel(int level, int target_level) { Log(options_.info_log, "Apply version edit:\n%s", edit.DebugString().data()); - status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); + status = versions_->LogAndApply(default_cfd_, &edit, &mutex_, + db_directory_.get()); superversion_to_free = InstallSuperVersion(default_cfd_, new_superversion); new_superversion = nullptr; @@ -1906,7 +1908,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); - status = versions_->LogAndApply(c->edit(), &mutex_, db_directory_.get()); + status = versions_->LogAndApply(default_cfd_, c->edit(), &mutex_, + db_directory_.get()); InstallSuperVersion(default_cfd_, deletion_state); Version::LevelSummaryStorage tmp; @@ -2155,8 +2158,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { compact->compaction->output_level(), out.number, out.file_size, out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); } - return versions_->LogAndApply(compact->compaction->edit(), &mutex_, - db_directory_.get()); + return versions_->LogAndApply(default_cfd_, compact->compaction->edit(), + &mutex_, db_directory_.get()); } // @@ -2949,7 +2952,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, edit.AddColumnFamily(column_family_name); handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); edit.SetColumnFamily(handle->id); - Status s = versions_->LogAndApply(&edit, &mutex_); + Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_); if (s.ok()) { // add to internal data structures versions_->CreateColumnFamily(options, &edit); @@ -2968,7 +2971,7 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { VersionEdit edit; edit.DropColumnFamily(); edit.SetColumnFamily(column_family.id); - Status s = versions_->LogAndApply(&edit, &mutex_); + Status s = versions_->LogAndApply(default_cfd_, &edit, &mutex_); if (s.ok()) { // remove from internal data structures versions_->DropColumnFamily(&edit); @@ -3830,7 +3833,8 @@ Status DBImpl::DeleteFile(std::string name) { } } edit.DeleteFile(level, number); - status = versions_->LogAndApply(&edit, &mutex_, db_directory_.get()); + status = versions_->LogAndApply(default_cfd_, &edit, &mutex_, + db_directory_.get()); if (status.ok()) { InstallSuperVersion(default_cfd_, deletion_state); } @@ -3977,7 +3981,7 @@ Status DB::OpenWithColumnFamilies( edit.SetLogNumber(new_log_number); impl->logfile_number_ = new_log_number; impl->log_.reset(new log::Writer(std::move(lfile))); - s = impl->versions_->LogAndApply(&edit, &impl->mutex_, + s = impl->versions_->LogAndApply(impl->default_cfd_, &edit, &impl->mutex_, impl->db_directory_.get()); } if (s.ok()) { diff --git a/db/db_test.cc b/db/db_test.cc index df1d0a5c23..acf46478c9 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5046,6 +5046,7 @@ void BM_LogAndApply(int iters, int num_base_files) { std::vector dummy; dummy.push_back(ColumnFamilyDescriptor()); ASSERT_OK(vset.Recover(dummy)); + auto default_cfd = vset.GetColumnFamilySet()->GetDefault(); VersionEdit vbase; uint64_t fnum = 1; for (int i = 0; i < num_base_files; i++) { @@ -5053,7 +5054,7 @@ void BM_LogAndApply(int iters, int num_base_files) { InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); vbase.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); } - ASSERT_OK(vset.LogAndApply(&vbase, &mu)); + ASSERT_OK(vset.LogAndApply(default_cfd, &vbase, &mu)); uint64_t start_micros = env->NowMicros(); @@ -5063,7 +5064,7 @@ void BM_LogAndApply(int iters, int num_base_files) { InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); vedit.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); - vset.LogAndApply(&vedit, &mu); + vset.LogAndApply(default_cfd, &vedit, &mu); } uint64_t stop_micros = env->NowMicros(); unsigned int us = stop_micros - start_micros; diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 9bd69b1af1..cbfb7c85e9 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -122,8 +122,8 @@ void MemTableList::PickMemtablesToFlush(std::vector* ret) { // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( - const std::vector& mems, VersionSet* vset, Status flushStatus, - port::Mutex* mu, Logger* info_log, uint64_t file_number, + ColumnFamilyData* cfd, const std::vector& mems, VersionSet* vset, + Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, std::set& pending_outputs, std::vector* to_delete, Directory* db_directory) { mu->AssertHeld(); @@ -177,7 +177,7 @@ Status MemTableList::InstallMemtableFlushResults( (unsigned long)m->file_number_); // this can release and reacquire the mutex. - s = vset->LogAndApply(&m->edit_, mu, db_directory); + s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory); // we will be changing the version in the next code path, // so we better create a new one, since versions are immutable diff --git a/db/memtablelist.h b/db/memtablelist.h index 57c4149296..d4fee3afd5 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -8,6 +8,7 @@ #include #include #include +#include #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/iterator.h" @@ -17,6 +18,7 @@ namespace rocksdb { +class ColumnFamilyData; class InternalKeyComparator; class Mutex; @@ -90,13 +92,11 @@ class MemTableList { void PickMemtablesToFlush(std::vector* mems); // Commit a successful flush in the manifest file - Status InstallMemtableFlushResults(const std::vector& m, - VersionSet* vset, Status flushStatus, - port::Mutex* mu, Logger* info_log, - uint64_t file_number, - std::set& pending_outputs, - std::vector* to_delete, - Directory* db_directory); + Status InstallMemtableFlushResults( + ColumnFamilyData* cfd, const std::vector& m, VersionSet* vset, + Status flushStatus, port::Mutex* mu, Logger* info_log, + uint64_t file_number, std::set& pending_outputs, + std::vector* to_delete, Directory* db_directory); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/version_set.cc b/db/version_set.cc index 9e64759ee8..2580233e37 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1973,7 +1973,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, VersionEdit ve; port::Mutex dummy_mutex; MutexLock l(&dummy_mutex); - return versions.LogAndApply(&ve, &dummy_mutex, nullptr, true); + return versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(), &ve, + &dummy_mutex, nullptr, true); } Status VersionSet::DumpManifest(Options& options, std::string& dscname, diff --git a/db/version_set.h b/db/version_set.h index 2a9bf6d916..308228d7b3 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -292,13 +292,6 @@ class VersionSet { port::Mutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false); - Status LogAndApply(VersionEdit* edit, port::Mutex* mu, - Directory* db_directory = nullptr, - bool new_descriptor_log = false) { - return LogAndApply(column_family_set_->GetDefault(), edit, mu, db_directory, - new_descriptor_log); - } - // Recover the last saved descriptor from persistent storage. Status Recover(const std::vector& column_families);