diff --git a/HISTORY.md b/HISTORY.md index 3d6384d6e8..b35ef01272 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,9 @@ * By default, max_background_flushes is 1 and flush process is removed from background compaction process. Flush process is now always executed in high priority thread pool. +* Column family support +* If you write something to the non-default column family with disableWAL = true, + you need to Flush the column family before exiting if you want your data to be persistent ## Unreleased (will be relased in 2.8) diff --git a/db/column_family.cc b/db/column_family.cc index 3a5fa067b1..17539c695b 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -185,9 +185,8 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, dropped_(false), internal_comparator_(options.comparator), internal_filter_policy_(options.filter_policy), - options_(SanitizeOptions(&internal_comparator_, &internal_filter_policy_, - options)), - full_options_(*db_options, options_), + options_(*db_options, SanitizeOptions(&internal_comparator_, + &internal_filter_policy_, options)), mem_(nullptr), imm_(options.min_write_buffer_number_to_merge), super_version_(nullptr), @@ -205,18 +204,19 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, internal_stats_.reset(new InternalStats(options.num_levels, db_options->env, db_options->statistics.get())); table_cache_.reset( - new TableCache(dbname, &full_options_, storage_options, table_cache)); + new TableCache(dbname, &options_, storage_options, table_cache)); if (options_.compaction_style == kCompactionStyleUniversal) { - compaction_picker_.reset(new UniversalCompactionPicker( - &options_, &internal_comparator_, db_options->info_log.get())); + compaction_picker_.reset( + new UniversalCompactionPicker(&options_, &internal_comparator_)); } else { - compaction_picker_.reset(new LevelCompactionPicker( - &options_, &internal_comparator_, db_options->info_log.get())); + compaction_picker_.reset( + new LevelCompactionPicker(&options_, &internal_comparator_)); } - Log(full_options_.info_log, "Options for column family \"%s\":\n", + Log(options_.info_log, "Options for column family \"%s\":\n", name.c_str()); - options_.Dump(full_options_.info_log.get()); + const ColumnFamilyOptions* cf_options = &options_; + cf_options->Dump(options_.info_log.get()); } } @@ -232,14 +232,27 @@ ColumnFamilyData::~ColumnFamilyData() { // it's nullptr for dummy CFD if (column_family_set_ != nullptr) { // remove from column_family_set - column_family_set_->DropColumnFamily(this); + column_family_set_->RemoveColumnFamily(this); } if (current_ != nullptr) { current_->Unref(); } - DeleteSuperVersion(); + if (super_version_ != nullptr) { + // Release SuperVersion reference kept in ThreadLocalPtr. + // This must be done outside of mutex_ since unref handler can lock mutex. + super_version_->db_mutex->Unlock(); + local_sv_.reset(); + super_version_->db_mutex->Lock(); + + bool is_last_reference __attribute__((unused)); + is_last_reference = super_version_->Unref(); + assert(is_last_reference); + super_version_->Cleanup(); + delete super_version_; + super_version_ = nullptr; + } if (dummy_versions_ != nullptr) { // List must be empty @@ -257,10 +270,6 @@ ColumnFamilyData::~ColumnFamilyData() { } } -InternalStats* ColumnFamilyData::internal_stats() { - return internal_stats_.get(); -} - void ColumnFamilyData::SetCurrent(Version* current) { current_ = current; need_slowdown_for_num_level0_files_ = @@ -320,23 +329,6 @@ void ColumnFamilyData::ResetThreadLocalSuperVersions() { } } -void ColumnFamilyData::DeleteSuperVersion() { - if (super_version_ != nullptr) { - // Release SuperVersion reference kept in ThreadLocalPtr. - // This must be done outside of mutex_ since unref handler can lock mutex. - super_version_->db_mutex->Unlock(); - local_sv_.reset(); - super_version_->db_mutex->Lock(); - - bool is_last_reference __attribute__((unused)); - is_last_reference = super_version_->Unref(); - assert(is_last_reference); - super_version_->Cleanup(); - delete super_version_; - super_version_ = nullptr; - } -} - ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const DBOptions* db_options, const EnvOptions& storage_options, @@ -345,6 +337,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, dummy_cfd_(new ColumnFamilyData(dbname, 0, "", nullptr, nullptr, ColumnFamilyOptions(), db_options, storage_options_, nullptr)), + default_cfd_cache_(nullptr), db_name_(dbname), db_options_(db_options), storage_options_(storage_options), @@ -367,10 +360,8 @@ ColumnFamilySet::~ColumnFamilySet() { } ColumnFamilyData* ColumnFamilySet::GetDefault() const { - auto cfd = GetColumnFamily(0); - // default column family should always exist - assert(cfd != nullptr); - return cfd; + assert(default_cfd_cache_ != nullptr); + return default_cfd_cache_; } ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const { @@ -385,24 +376,13 @@ ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const { ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name) const { auto cfd_iter = column_families_.find(name); - if (cfd_iter == column_families_.end()) { + if (cfd_iter != column_families_.end()) { + auto cfd = GetColumnFamily(cfd_iter->second); + assert(cfd != nullptr); + return cfd; + } else { return nullptr; } - return GetColumnFamily(cfd_iter->second); -} - -bool ColumnFamilySet::Exists(uint32_t id) { - return column_family_data_.find(id) != column_family_data_.end(); -} - -bool ColumnFamilySet::Exists(const std::string& name) { - return column_families_.find(name) != column_families_.end(); -} - -uint32_t ColumnFamilySet::GetID(const std::string& name) { - auto cfd_iter = column_families_.find(name); - assert(cfd_iter != column_families_.end()); - return cfd_iter->second; } uint32_t ColumnFamilySet::GetNextColumnFamilyID() { @@ -434,19 +414,12 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( new_cfd->prev_ = prev; prev->next_ = new_cfd; dummy_cfd_->prev_ = new_cfd; + if (id == 0) { + default_cfd_cache_ = new_cfd; + } return new_cfd; } -// under a DB mutex -void ColumnFamilySet::DropColumnFamily(ColumnFamilyData* cfd) { - auto cfd_iter = column_family_data_.find(cfd->GetID()); - assert(cfd_iter != column_family_data_.end()); - Lock(); - column_family_data_.erase(cfd_iter); - column_families_.erase(cfd->GetName()); - Unlock(); -} - void ColumnFamilySet::Lock() { // spin lock while (spin_lock_.test_and_set(std::memory_order_acquire)) { @@ -455,11 +428,26 @@ void ColumnFamilySet::Lock() { void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); } +// under a DB mutex +void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { + auto cfd_iter = column_family_data_.find(cfd->GetID()); + assert(cfd_iter != column_family_data_.end()); + Lock(); + column_family_data_.erase(cfd_iter); + column_families_.erase(cfd->GetName()); + Unlock(); +} + bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { - // maybe outside of db mutex, should lock - column_family_set_->Lock(); - current_ = column_family_set_->GetColumnFamily(column_family_id); - column_family_set_->Unlock(); + if (column_family_id == 0) { + // optimization for common case + current_ = column_family_set_->GetDefault(); + } else { + // maybe outside of db mutex, should lock + column_family_set_->Lock(); + current_ = column_family_set_->GetColumnFamily(column_family_id); + column_family_set_->Unlock(); + } handle_.SetCFD(current_); return current_ != nullptr; } @@ -474,9 +462,9 @@ MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const { return current_->mem(); } -const Options* ColumnFamilyMemTablesImpl::GetFullOptions() const { +const Options* ColumnFamilyMemTablesImpl::GetOptions() const { assert(current_ != nullptr); - return current_->full_options(); + return current_->options(); } ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() { diff --git a/db/column_family.h b/db/column_family.h index 2ff2e02aba..50047be23e 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -15,6 +15,7 @@ #include #include "rocksdb/options.h" +#include "rocksdb/db.h" #include "rocksdb/env.h" #include "db/memtable_list.h" #include "db/write_batch_internal.h" @@ -35,6 +36,9 @@ class ColumnFamilyData; class DBImpl; class LogBuffer; +// ColumnFamilyHandleImpl is the class that clients use to access different +// column families. It has non-trivial destructor, which gets called when client +// is done using the column family class ColumnFamilyHandleImpl : public ColumnFamilyHandle { public: // create while holding the mutex @@ -51,7 +55,12 @@ class ColumnFamilyHandleImpl : public ColumnFamilyHandle { port::Mutex* mutex_; }; -// does not ref-count cfd_ +// Does not ref-count ColumnFamilyData +// We use this dummy ColumnFamilyHandleImpl because sometimes MemTableInserter +// calls DBImpl methods. When this happens, MemTableInserter need access to +// ColumnFamilyHandle (same as the client would need). In that case, we feed +// MemTableInserter dummy ColumnFamilyHandle and enable it to call DBImpl +// methods class ColumnFamilyHandleInternal : public ColumnFamilyHandleImpl { public: ColumnFamilyHandleInternal() @@ -110,15 +119,18 @@ extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, class ColumnFamilySet; -// column family metadata. not thread-safe. should be protected by db_mutex +// This class keeps all the data that a column family needs. It's mosly dumb and +// used just to provide access to metadata. +// Most methods require DB mutex held, unless otherwise noted class ColumnFamilyData { public: ~ColumnFamilyData(); + // thread-safe uint32_t GetID() const { return id_; } - const std::string& GetName() { return name_; } + // thread-safe + const std::string& GetName() const { return name_; } - // DB mutex held for all these void Ref() { ++refs_; } // will just decrease reference count to 0, but will not delete it. returns // true if the ref count was decreased to zero and needs to be cleaned up by @@ -127,24 +139,37 @@ class ColumnFamilyData { assert(refs_ > 0); return --refs_ == 0; } - bool Dead() { return refs_ == 0; } - // SetDropped() and IsDropped() are thread-safe + // This can only be called from single-threaded VersionSet::LogAndApply() + // After dropping column family no other operation on that column family + // will be executed. All the files and memory will be, however, kept around + // until client drops the column family handle. That way, client can still + // access data from dropped column family. + // Column family can be dropped and still alive. In that state: + // *) Column family is not included in the iteration. + // *) Compaction and flush is not executed on the dropped column family. + // *) Client can continue writing and reading from column family. However, all + // writes stay in the current memtable. + // When the dropped column family is unreferenced, then we: + // *) delete all memory associated with that column family + // *) delete all the files associated with that column family void SetDropped() { // can't drop default CF assert(id_ != 0); - dropped_.store(true); + dropped_ = true; } - bool IsDropped() const { return dropped_.load(); } + bool IsDropped() const { return dropped_; } + // thread-safe int NumberLevels() const { return options_.num_levels; } void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } uint64_t GetLogNumber() const { return log_number_; } - const ColumnFamilyOptions* options() const { return &options_; } - const Options* full_options() const { return &full_options_; } - InternalStats* internal_stats(); + // thread-safe + const Options* options() const { return &options_; } + + InternalStats* internal_stats() { return internal_stats_.get(); } MemTableList* imm() { return &imm_; } MemTable* mem() { return mem_; } @@ -154,7 +179,7 @@ class ColumnFamilyData { void SetCurrent(Version* current); void CreateNewMemtable(); - TableCache* table_cache() const { return table_cache_.get(); } + TableCache* table_cache() { return table_cache_.get(); } // See documentation in compaction_picker.h Compaction* PickCompaction(LogBuffer* log_buffer); @@ -162,18 +187,20 @@ class ColumnFamilyData { const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end); - CompactionPicker* compaction_picker() const { - return compaction_picker_.get(); - } + CompactionPicker* compaction_picker() { return compaction_picker_.get(); } + // thread-safe const Comparator* user_comparator() const { return internal_comparator_.user_comparator(); } + // thread-safe const InternalKeyComparator& internal_comparator() const { return internal_comparator_; } - SuperVersion* GetSuperVersion() const { return super_version_; } + SuperVersion* GetSuperVersion() { return super_version_; } + // thread-safe ThreadLocalPtr* GetThreadLocalSuperVersion() const { return local_sv_.get(); } + // thread-safe uint64_t GetSuperVersionNumber() const { return super_version_number_.load(); } @@ -185,9 +212,6 @@ class ColumnFamilyData { port::Mutex* db_mutex); void ResetThreadLocalSuperVersions(); - // REQUIRED: db mutex held - // Do not access column family after calling this method - void DeleteSuperVersion(); // A Flag indicating whether write needs to slowdown because of there are // too many number of level0 files. @@ -204,21 +228,18 @@ class ColumnFamilyData { const EnvOptions& storage_options, ColumnFamilySet* column_family_set); - ColumnFamilyData* next() { return next_; } - uint32_t id_; const std::string name_; Version* dummy_versions_; // Head of circular doubly-linked list of versions. Version* current_; // == dummy_versions->prev_ int refs_; // outstanding references to ColumnFamilyData - std::atomic dropped_; // true if client dropped it + bool dropped_; // true if client dropped it const InternalKeyComparator internal_comparator_; const InternalFilterPolicy internal_filter_policy_; - ColumnFamilyOptions const options_; - Options const full_options_; + Options const options_; std::unique_ptr table_cache_; @@ -258,19 +279,33 @@ class ColumnFamilyData { ColumnFamilySet* column_family_set_; }; -// Thread safe only for reading without a writer. All access should be -// locked when adding or dropping column family +// ColumnFamilySet has interesting thread-safety requirements +// * CreateColumnFamily() or RemoveColumnFamily() -- need to protect by DB +// mutex. Inside, column_family_data_ and column_families_ will be protected +// by Lock() and Unlock(). CreateColumnFamily() should ONLY be called from +// VersionSet::LogAndApply() in the normal runtime. It is also called +// during Recovery and in DumpManifest(). RemoveColumnFamily() is called +// from ColumnFamilyData destructor +// * Iteration -- hold DB mutex, but you can release it in the body of +// iteration. If you release DB mutex in body, reference the column +// family before the mutex and unreference after you unlock, since the column +// family might get dropped when the DB mutex is released +// * GetDefault() -- thread safe +// * GetColumnFamily() -- either inside of DB mutex or call Lock() <-> Unlock() +// * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily() -- +// inside of DB mutex class ColumnFamilySet { public: + // ColumnFamilySet supports iteration class iterator { public: explicit iterator(ColumnFamilyData* cfd) : current_(cfd) {} iterator& operator++() { - // dummy is never dead, so this will never be infinite + // dummy is never dead or dropped, so this will never be infinite do { - current_ = current_->next(); - } while (current_->Dead()); + current_ = current_->next_; + } while (current_->refs_ == 0 || current_->IsDropped()); return *this; } bool operator!=(const iterator& other) { @@ -290,9 +325,6 @@ class ColumnFamilySet { // GetColumnFamily() calls return nullptr if column family is not found ColumnFamilyData* GetColumnFamily(uint32_t id) const; ColumnFamilyData* GetColumnFamily(const std::string& name) const; - bool Exists(uint32_t id); - bool Exists(const std::string& name); - uint32_t GetID(const std::string& name); // this call will return the next available column family ID. it guarantees // that there is no column family with id greater than or equal to the // returned value in the current running instance or anytime in RocksDB @@ -304,34 +336,33 @@ class ColumnFamilySet { ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id, Version* dummy_version, const ColumnFamilyOptions& options); - void DropColumnFamily(ColumnFamilyData* cfd); - iterator begin() { return iterator(dummy_cfd_->next()); } + iterator begin() { return iterator(dummy_cfd_->next_); } iterator end() { return iterator(dummy_cfd_); } - // ColumnFamilySet has interesting thread-safety requirements - // * CreateColumnFamily() or DropColumnFamily() -- need to protect by DB - // mutex. Inside, column_family_data_ and column_families_ will be protected - // by Lock() and Unlock() - // * Iterate -- hold DB mutex, but you can release it in the body of - // iteration. If you release DB mutex in body, reference the column - // family before the mutex and unreference after you unlock, since the column - // family might get dropped when you release the DB mutex. - // * GetDefault(), GetColumnFamily(), Exists(), GetID() -- either inside of DB - // mutex or call Lock() - // * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily() -- - // inside of DB mutex void Lock(); void Unlock(); private: - // when mutating: 1. DB mutex locked first, 2. spinlock locked second - // when reading, either: 1. lock DB mutex, or 2. lock spinlock + friend class ColumnFamilyData; + // helper function that gets called from cfd destructor + // REQUIRES: DB mutex held + void RemoveColumnFamily(ColumnFamilyData* cfd); + + // column_families_ and column_family_data_ need to be protected: + // * when mutating: 1. DB mutex locked first, 2. spinlock locked second + // * when reading, either: 1. lock DB mutex, or 2. lock spinlock // (if both, respect the ordering to avoid deadlock!) std::unordered_map column_families_; std::unordered_map column_family_data_; + uint32_t max_column_family_; ColumnFamilyData* dummy_cfd_; + // We don't hold the refcount here, since default column family always exists + // We are also not responsible for cleaning up default_cfd_cache_. This is + // just a cache that makes common case (accessing default column family) + // faster + ColumnFamilyData* default_cfd_cache_; const std::string db_name_; const DBOptions* const db_options_; @@ -340,6 +371,8 @@ class ColumnFamilySet { std::atomic_flag spin_lock_; }; +// We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access +// memtables of different column families (specified by ID in the write batch) class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { public: explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set) @@ -357,7 +390,7 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { // Returns options for selected column family // REQUIRES: Seek() called first - virtual const Options* GetFullOptions() const override; + virtual const Options* GetOptions() const override; // Returns column family handle for the selected column family virtual ColumnFamilyHandle* GetColumnFamilyHandle() override; diff --git a/db/column_family_test.cc b/db/column_family_test.cc index a3b0752426..11b36eec42 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -797,7 +797,7 @@ std::string IterStatus(Iterator* iter) { } return result; } -} // namespace anonymous +} // anonymous namespace TEST(ColumnFamilyTest, NewIteratorsTest) { // iter == 0 -- no tailing diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 531426a7ad..52499f301b 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -42,11 +42,9 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { } // anonymous namespace -CompactionPicker::CompactionPicker(const ColumnFamilyOptions* options, - const InternalKeyComparator* icmp, - Logger* logger) +CompactionPicker::CompactionPicker(const Options* options, + const InternalKeyComparator* icmp) : compactions_in_progress_(options->num_levels), - logger_(logger), options_(options), num_levels_(options->num_levels), icmp_(icmp) { @@ -272,7 +270,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { &c->parent_index_); if (expanded1.size() == c->inputs_[1].size() && !FilesInCompaction(expanded1)) { - Log(logger_, + Log(options_->info_log, "Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu bytes)" "\n", (unsigned long)level, (unsigned long)(c->inputs_[0].size()), @@ -343,7 +341,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, c->inputs_[0] = inputs; if (ExpandWhileOverlapping(c) == false) { delete c; - Log(logger_, "Could not compact due to expansion failure.\n"); + Log(options_->info_log, "Could not compact due to expansion failure.\n"); return nullptr; } @@ -514,7 +512,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, } //if (i > Version::number_of_files_to_sort_) { - // Log(logger_, "XXX Looking at index %d", i); + // Log(options_->info_log, "XXX Looking at index %d", i); //} // Do not pick this file if its parents at level+1 are being compacted. @@ -610,6 +608,10 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, c->bottommost_level_ = true; } + // update statistics + MeasureTime(options_->statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION, + c->inputs_[0].size()); + // mark all the files that are being compacted c->MarkFilesBeingCompacted(true); diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 164d23b11d..6527ef9677 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -26,8 +26,7 @@ class Version; class CompactionPicker { public: - CompactionPicker(const ColumnFamilyOptions* options, - const InternalKeyComparator* icmp, Logger* logger); + CompactionPicker(const Options* options, const InternalKeyComparator* icmp); virtual ~CompactionPicker(); // Pick level and inputs for a new compaction. @@ -119,8 +118,7 @@ class CompactionPicker { // Per-level max bytes std::unique_ptr level_max_bytes_; - Logger* logger_; - const ColumnFamilyOptions* const options_; + const Options* const options_; private: int num_levels_; @@ -130,9 +128,9 @@ class CompactionPicker { class UniversalCompactionPicker : public CompactionPicker { public: - UniversalCompactionPicker(const ColumnFamilyOptions* options, - const InternalKeyComparator* icmp, Logger* logger) - : CompactionPicker(options, icmp, logger) {} + UniversalCompactionPicker(const Options* options, + const InternalKeyComparator* icmp) + : CompactionPicker(options, icmp) {} virtual Compaction* PickCompaction(Version* version, LogBuffer* log_buffer) override; @@ -150,9 +148,9 @@ class UniversalCompactionPicker : public CompactionPicker { class LevelCompactionPicker : public CompactionPicker { public: - LevelCompactionPicker(const ColumnFamilyOptions* options, - const InternalKeyComparator* icmp, Logger* logger) - : CompactionPicker(options, icmp, logger) {} + LevelCompactionPicker(const Options* options, + const InternalKeyComparator* icmp) + : CompactionPicker(options, icmp) {} virtual Compaction* PickCompaction(Version* version, LogBuffer* log_buffer) override; diff --git a/db/db_impl.cc b/db/db_impl.cc index 1632d04939..437496cd05 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -276,26 +276,17 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) } DBImpl::~DBImpl() { - // Wait for background work to finish - mutex_.Lock(); - if (flush_on_destroy_) { - autovector to_delete; - for (auto cfd : *versions_->GetColumnFamilySet()) { - // TODO(icanadi) do this in ColumnFamilyData destructor - if (!cfd->IsDropped() && cfd->mem()->GetFirstSequenceNumber() != 0) { - cfd->Ref(); - mutex_.Unlock(); - FlushMemTable(cfd, FlushOptions()); - mutex_.Lock(); - if (cfd->Unref()) { - to_delete.push_back(cfd); - } - } - } - for (auto cfd : to_delete) { - delete cfd; + // only the default CFD is alive at this point + if (default_cf_handle_ != nullptr) { + auto default_cfd = default_cf_handle_->cfd(); + if (flush_on_destroy_ && + default_cfd->mem()->GetFirstSequenceNumber() != 0) { + FlushMemTable(default_cfd, FlushOptions()); } } + + mutex_.Lock(); + // Wait for background work to finish shutting_down_.Release_Store(this); // Any non-nullptr value is ok while (bg_compaction_scheduled_ || bg_flush_scheduled_ || @@ -303,8 +294,11 @@ DBImpl::~DBImpl() { bg_cv_.Wait(); } - for (auto cfd : *versions_->GetColumnFamilySet()) { - cfd->DeleteSuperVersion(); + if (default_cf_handle_ != nullptr) { + // we need to delete handle outside of lock because it does its own locking + mutex_.Unlock(); + delete default_cf_handle_; + mutex_.Lock(); } if (options_.allow_thread_local) { @@ -328,21 +322,13 @@ DBImpl::~DBImpl() { } } - mutex_.Unlock(); - if (default_cf_handle_ != nullptr) { - // we need to delete handle outside of lock because it does its own locking - delete default_cf_handle_; - } - - if (db_lock_ != nullptr) { - env_->UnlockFile(db_lock_); - } - - mutex_.Lock(); // versions need to be destroyed before table_cache since it can hold // references to table_cache. versions_.reset(); mutex_.Unlock(); + if (db_lock_ != nullptr) { + env_->UnlockFile(db_lock_); + } LogFlush(options_.info_log); } @@ -876,14 +862,8 @@ Status DBImpl::Recover( versions_->MarkFileNumberUsed(log); s = RecoverLogFile(log, &max_sequence, read_only); } - - if (s.ok()) { - if (versions_->LastSequence() < max_sequence) { - versions_->SetLastSequence(max_sequence); - } - SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER, - versions_->LastSequence()); - } + SetTickerCount(options_.statistics.get(), SEQUENCE_NUMBER, + versions_->LastSequence()); } return s; @@ -1029,9 +1009,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, // we must mark the next log number as used, even though it's // not actually used. that is because VersionSet assumes // VersionSet::next_file_number_ always to be strictly greater than any - // log - // number + // log number versions_->MarkFileNumberUsed(log_number + 1); + if (versions_->LastSequence() < *max_sequence) { + versions_->SetLastSequence(*max_sequence); + } status = versions_->LogAndApply(cfd, edit, &mutex_); if (!status.ok()) { return status; @@ -1059,10 +1041,10 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, Status s; { mutex_.Unlock(); - s = BuildTable(dbname_, env_, *cfd->full_options(), storage_options_, + s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, cfd->table_cache(), iter, &meta, cfd->internal_comparator(), newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->full_options())); + GetCompressionFlush(*cfd->options())); LogFlush(options_.info_log); mutex_.Lock(); } @@ -1124,10 +1106,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, Log(options_.info_log, "Level-0 flush table #%lu: started", (unsigned long)meta.number); - s = BuildTable(dbname_, env_, *cfd->full_options(), storage_options_, + s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, cfd->table_cache(), iter, &meta, cfd->internal_comparator(), newest_snapshot, earliest_seqno_in_memtable, - GetCompressionFlush(*cfd->full_options())); + GetCompressionFlush(*cfd->options())); LogFlush(options_.info_log); delete iter; Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s", @@ -1758,7 +1740,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { bool is_flush_pending = false; // no need to refcount since we're under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->IsDropped() && cfd->imm()->IsFlushPending()) { + if (cfd->imm()->IsFlushPending()) { is_flush_pending = true; } } @@ -1775,7 +1757,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { bool is_compaction_needed = false; // no need to refcount since we're under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->IsDropped() && cfd->current()->NeedsCompaction()) { + if (cfd->current()->NeedsCompaction()) { is_compaction_needed = true; break; } @@ -1813,9 +1795,6 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, autovector to_delete; // refcounting in iteration for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } cfd->Ref(); Status flush_status; while (flush_status.ok() && cfd->imm()->IsFlushPending()) { @@ -1988,7 +1967,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { // no need to refcount in iteration since it's always under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->options()->disable_auto_compactions && !cfd->IsDropped()) { + if (!cfd->options()->disable_auto_compactions) { c.reset(cfd->PickCompaction(log_buffer)); if (c != nullptr) { // update statistics @@ -2166,12 +2145,12 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { 1.1 * cfd->compaction_picker()->MaxFileSizeForLevel( compact->compaction->output_level())); - CompressionType compression_type = GetCompressionType( - *cfd->full_options(), compact->compaction->output_level(), - compact->compaction->enable_compression()); + CompressionType compression_type = + GetCompressionType(*cfd->options(), compact->compaction->output_level(), + compact->compaction->enable_compression()); compact->builder.reset( - NewTableBuilder(*cfd->full_options(), cfd->internal_comparator(), + NewTableBuilder(*cfd->options(), cfd->internal_comparator(), compact->outfile.get(), compression_type)); } LogFlush(options_.info_log); @@ -2788,8 +2767,8 @@ std::pair DBImpl::GetTailingIteratorPair( Iterator* mutable_iter = super_version->mem->NewIterator(options); // create a DBIter that only uses memtable content; see NewIterator() mutable_iter = - NewDBIterator(&dbname_, env_, *cfd->full_options(), - cfd->user_comparator(), mutable_iter, kMaxSequenceNumber); + NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), + mutable_iter, kMaxSequenceNumber); std::vector list; super_version->imm->AddIterators(options, &list); @@ -2799,8 +2778,8 @@ std::pair DBImpl::GetTailingIteratorPair( // create a DBIter that only uses memtable content; see NewIterator() immutable_iter = - NewDBIterator(&dbname_, env_, *cfd->full_options(), - cfd->user_comparator(), immutable_iter, kMaxSequenceNumber); + NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), + immutable_iter, kMaxSequenceNumber); // register cleanups mutable_iter->RegisterCleanup(CleanupIteratorState, @@ -2937,11 +2916,10 @@ Status DBImpl::GetImpl(const ReadOptions& options, // merge_operands will contain the sequence of merges in the latter case. LookupKey lkey(key, snapshot); BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer); - if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->full_options())) { + if (sv->mem->Get(lkey, value, &s, merge_context, *cfd->options())) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); - } else if (sv->imm->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + } else if (sv->imm->Get(lkey, value, &s, merge_context, *cfd->options())) { // Done RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { @@ -2950,7 +2928,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, StartPerfTimer(&from_files_timer); sv->current->Get(options, lkey, value, &s, &merge_context, &stats, - *cfd->full_options(), value_found); + *cfd->options(), value_found); have_stat_update = true; BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer); RecordTick(options_.statistics.get(), MEMTABLE_MISS); @@ -3072,14 +3050,14 @@ std::vector DBImpl::MultiGet( auto super_version = mgd->super_version; auto cfd = mgd->cfd; if (super_version->mem->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + *cfd->options())) { // Done } else if (super_version->imm->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + *cfd->options())) { // Done } else { super_version->current->Get(options, lkey, value, &s, &merge_context, - &mgd->stats, *cfd->full_options()); + &mgd->stats, *cfd->options()); mgd->have_stat_update = true; } @@ -3134,7 +3112,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, *handle = nullptr; MutexLock l(&mutex_); - if (versions_->GetColumnFamilySet()->Exists(column_family_name)) { + if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) != + nullptr) { return Status::InvalidArgument("Column family already exists"); } VersionEdit edit; @@ -3144,6 +3123,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, edit.SetLogNumber(logfile_number_); edit.SetComparatorName(options.comparator->Name()); + // LogAndApply will both write the creation in MANIFEST and create + // ColumnFamilyData object Status s = versions_->LogAndApply(nullptr, &edit, &mutex_, db_directory_.get(), false, &options); if (s.ok()) { @@ -3184,9 +3165,10 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { } if (s.ok()) { + assert(cfd->IsDropped()); Log(options_.info_log, "Dropped column family with id %u\n", cfd->GetID()); // Flush the memtables. This will make all WAL files referencing dropped - // column family to be obsolete. They will be deleted when user deletes + // column family to be obsolete. They will be deleted once user deletes // column family handle Write(WriteOptions(), nullptr); // ignore error } else { @@ -3237,7 +3219,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot; - iter = NewDBIterator(&dbname_, env_, *cfd->full_options(), + iter = NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), iter, snapshot); } @@ -3292,7 +3274,7 @@ Status DBImpl::NewIterators( : latest_snapshot; auto iter = NewInternalIterator(options, cfd, super_versions[i]); - iter = NewDBIterator(&dbname_, env_, *cfd->full_options(), + iter = NewDBIterator(&dbname_, env_, *cfd->options(), cfd->user_comparator(), iter, snapshot); iterators->push_back(iter); } @@ -3364,9 +3346,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { autovector to_delete; // refcounting cfd in iteration for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } cfd->Ref(); // May temporarily unlock and wait. status = MakeRoomForWrite(cfd, my_batch == nullptr); @@ -3586,6 +3565,8 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { // Yield previous error s = bg_error_; break; + } else if (cfd->IsDropped()) { + break; } else if (allow_delay && cfd->NeedSlowdownForNumLevel0Files()) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several @@ -3786,7 +3767,7 @@ Env* DBImpl::GetEnv() const { const Options& DBImpl::GetOptions(ColumnFamilyHandle* column_family) const { auto cfh = reinterpret_cast(column_family); - return *cfh->cfd()->full_options(); + return *cfh->cfd()->options(); } bool DBImpl::GetProperty(ColumnFamilyHandle* column_family, @@ -4058,24 +4039,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, VersionEdit edit; impl->logfile_number_ = new_log_number; impl->log_.reset(new log::Writer(std::move(lfile))); - // We use this LogAndApply just to store the next file number, the one - // that we used by calling impl->versions_->NewFileNumber() - // The used log number are already written to manifest in RecoverLogFile() - // method - s = impl->versions_->LogAndApply(impl->default_cf_handle_->cfd(), &edit, - &impl->mutex_, - impl->db_directory_.get()); - } - if (s.ok()) { + // set column family handles for (auto cf : column_families) { - if (!impl->versions_->GetColumnFamilySet()->Exists(cf.name)) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (cfd == nullptr) { s = Status::InvalidArgument("Column family not found: ", cf.name); break; } - uint32_t id = impl->versions_->GetColumnFamilySet()->GetID(cf.name); - auto cfd = impl->versions_->GetColumnFamilySet()->GetColumnFamily(id); - assert(cfd != nullptr); handles->push_back( new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); } diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 15725b335b..c1a8eef364 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -63,11 +63,11 @@ Status DBImplReadOnly::Get(const ReadOptions& options, MergeContext merge_context; LookupKey lkey(key, snapshot); if (super_version->mem->Get(lkey, value, &s, merge_context, - *cfd->full_options())) { + *cfd->options())) { } else { Version::GetStats stats; super_version->current->Get(options, lkey, value, &s, &merge_context, - &stats, *cfd->full_options()); + &stats, *cfd->options()); } return s; } @@ -80,8 +80,7 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options, SequenceNumber latest_snapshot = versions_->LastSequence(); Iterator* internal_iter = NewInternalIterator(options, cfd, super_version); return NewDBIterator( - &dbname_, env_, *cfd->full_options(), cfd->user_comparator(), - internal_iter, + &dbname_, env_, *cfd->options(), cfd->user_comparator(), internal_iter, (options.snapshot != nullptr ? reinterpret_cast(options.snapshot)->number_ : latest_snapshot)); diff --git a/db/memtable.cc b/db/memtable.cc index b29f9339c8..d0f54ce4bd 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -29,7 +29,7 @@ namespace rocksdb { MemTable::MemTable(const InternalKeyComparator& cmp, - const ColumnFamilyOptions& options) + const Options& options) : comparator_(cmp), refs_(0), arena_(options.arena_block_size), diff --git a/db/memtable.h b/db/memtable.h index cf93aece88..38602e3aa9 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -39,7 +39,7 @@ class MemTable { // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator, - const ColumnFamilyOptions& options); + const Options& options); ~MemTable(); diff --git a/db/repair.cc b/db/repair.cc index 597ddd6580..4e0d025c02 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -108,7 +108,6 @@ class Repairer { InternalKeyComparator const icmp_; InternalFilterPolicy const ipolicy_; Options const options_; - ColumnFamilyOptions const cf_options_; std::shared_ptr raw_table_cache_; TableCache* table_cache_; VersionEdit* edit_; diff --git a/db/version_set.cc b/db/version_set.cc index 15411e2862..08cf33a8b1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -249,7 +249,7 @@ bool Version::PrefixMayMatch(const ReadOptions& options, Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { auto table_cache = cfd_->table_cache(); - auto options = cfd_->full_options(); + auto options = cfd_->options(); for (int level = 0; level < num_levels_; level++) { for (const auto& file_meta : files_[level]) { auto fname = TableFileName(vset_->dbname_, file_meta->number); @@ -1491,11 +1491,6 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, assert(column_family_data != nullptr || edit->is_column_family_add_); - if (column_family_data != nullptr && column_family_data->IsDropped()) { - // if column family is dropped no need to write anything to the manifest - // (unless, of course, thit is the drop column family write) - return Status::OK(); - } if (edit->is_column_family_drop_) { // if we drop column family, we have to make sure to save max column family, // so that we don't reuse existing ID @@ -1511,6 +1506,16 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (w.done) { return w.status; } + if (column_family_data != nullptr && column_family_data->IsDropped()) { + // if column family is dropped by the time we get here, no need to write + // anything to the manifest + manifest_writers_.pop_front(); + // Notify new head of write queue + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } + return Status::OK(); + } std::vector batch_edits; Version* v = nullptr; @@ -2353,9 +2358,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // LogAndApply. Column family manipulations can only happen within LogAndApply // (the same single thread), so we're safe for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { - continue; - } { // Store column family info VersionEdit edit; @@ -2401,19 +2403,18 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { } } - // save max column family to avoid reusing the same column - // family ID for two different column families - if (column_family_set_->GetMaxColumnFamily() > 0) { + { + // persist max column family, last sequence and next file VersionEdit edit; - edit.SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); + if (column_family_set_->GetMaxColumnFamily() > 0) { + edit.SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); + } + edit.SetLastSequence(last_sequence_); + edit.SetNextFile(next_file_number_); std::string record; edit.EncodeTo(&record); - Status s = log->AddRecord(record); - if (!s.ok()) { - return s; - } + return log->AddRecord(record); } - return Status::OK(); } // Opens the mainfest file and reads all records diff --git a/db/write_batch.cc b/db/write_batch.cc index 5747c53021..da85412240 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -289,7 +289,7 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - const Options* options = cf_mems_->GetFullOptions(); + const Options* options = cf_mems_->GetOptions(); if (!options->inplace_update_support) { mem->Add(sequence_, kTypeValue, key, value); } else if (options->inplace_callback == nullptr) { @@ -344,7 +344,7 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - const Options* options = cf_mems_->GetFullOptions(); + const Options* options = cf_mems_->GetOptions(); bool perform_merge = false; if (options->max_successive_merges > 0 && db_ != nullptr) { @@ -413,7 +413,7 @@ class MemTableInserter : public WriteBatch::Handler { return seek_status; } MemTable* mem = cf_mems_->GetMemTable(); - const Options* options = cf_mems_->GetFullOptions(); + const Options* options = cf_mems_->GetOptions(); if (!dont_filter_deletes_ && options->filter_deletes) { SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index e5ee045cdf..793ee3e0ee 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -26,7 +26,7 @@ class ColumnFamilyMemTables { // been processed) virtual uint64_t GetLogNumber() const = 0; virtual MemTable* GetMemTable() const = 0; - virtual const Options* GetFullOptions() const = 0; + virtual const Options* GetOptions() const = 0; virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0; }; @@ -47,7 +47,7 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { return mem_; } - const Options* GetFullOptions() const override { + const Options* GetOptions() const override { assert(ok_); return options_; } diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 773823216f..c6e17e8b73 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -24,7 +24,7 @@ static std::string PrintContents(WriteBatch* b) { auto factory = std::make_shared(); Options options; options.memtable_factory = factory; - MemTable* mem = new MemTable(cmp, ColumnFamilyOptions(options)); + MemTable* mem = new MemTable(cmp, options); mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem, &options); diff --git a/table/table_test.cc b/table/table_test.cc index aa874eb183..18ae2a3aa2 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -410,8 +410,7 @@ class MemTableConstructor: public Constructor { table_factory_(new SkipListFactory) { Options options; options.memtable_factory = table_factory_; - memtable_ = - new MemTable(internal_comparator_, ColumnFamilyOptions(options)); + memtable_ = new MemTable(internal_comparator_, options); memtable_->Ref(); } ~MemTableConstructor() {