diff --git a/db/column_family.cc b/db/column_family.cc index 77e2240004..afbf69e9be 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -69,7 +69,8 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, options(options), mem(nullptr), imm(options.min_write_buffer_number_to_merge), - super_version(nullptr) {} + super_version(nullptr), + log_number(0) {} ColumnFamilyData::~ColumnFamilyData() { if (super_version != nullptr) { @@ -167,4 +168,18 @@ void ColumnFamilySet::DropColumnFamily(uint32_t id) { column_family_data_.erase(cfd); } +MemTable* ColumnFamilyMemTablesImpl::GetMemTable(uint32_t column_family_id) { + auto cfd = column_family_set_->GetColumnFamily(column_family_id); + // TODO(icanadi): this should not be asserting. Rather, it should somehow + // return Corruption status back to the Iterator. This will require + // API change in WriteBatch::Handler, which is a public API + assert(cfd != nullptr); + + if (log_number_ == 0 || log_number_ >= cfd->log_number) { + return cfd->mem; + } else { + return nullptr; + } +} + } // namespace rocksdb diff --git a/db/column_family.h b/db/column_family.h index b5235d3dfd..44e459e368 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -15,6 +15,7 @@ #include "rocksdb/options.h" #include "db/memtablelist.h" +#include "db/write_batch_internal.h" namespace rocksdb { @@ -63,6 +64,11 @@ struct ColumnFamilyData { MemTableList imm; SuperVersion* super_version; + // This is the earliest log file number that contains data from this + // Column Family. All earlier log files must be ignored and not + // recovered from + uint64_t log_number; + ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, const ColumnFamilyOptions& options); ~ColumnFamilyData(); @@ -122,4 +128,24 @@ class ColumnFamilySet { uint32_t max_column_family_; }; +class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { + public: + explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set) + : column_family_set_(column_family_set), log_number_(0) {} + + // If column_family_data->log_number is bigger than log_number, + // the memtable will not be returned. + // If log_number == 0, the memtable will be always returned + void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } + + // Returns the column families memtable if log_number == 0 || log_number <= + // column_family_data->log_number. + // If column family doesn't exist, it asserts + virtual MemTable* GetMemTable(uint32_t column_family_id) override; + + private: + ColumnFamilySet* column_family_set_; + uint64_t log_number_; +}; + } // namespace rocksdb diff --git a/db/column_family_test.cc b/db/column_family_test.cc index fc278ecf30..eb48f6cff4 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -8,8 +8,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_impl.h" +#include "rocksdb/env.h" #include "rocksdb/db.h" #include "util/testharness.h" +#include "utilities/merge_operators.h" #include #include @@ -22,10 +24,10 @@ using namespace std; class ColumnFamilyTest { public: ColumnFamilyTest() { + env_ = Env::Default(); dbname_ = test::TmpDir() + "/column_family_test"; db_options_.create_if_missing = true; - options_.create_if_missing = true; - DestroyDB(dbname_, options_); + DestroyDB(dbname_, Options(db_options_, column_family_options_)); } void Close() { @@ -37,18 +39,77 @@ class ColumnFamilyTest { vector column_families; for (auto x : cf) { column_families.push_back( - ColumnFamilyDescriptor(x, ColumnFamilyOptions())); + ColumnFamilyDescriptor(x, column_family_options_)); } - vector handles; return DB::OpenWithColumnFamilies(db_options_, dbname_, column_families, - &handles, &db_); + &handles_, &db_); } - Options options_; + void Destroy() { + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_))); + } + + void CreateColumnFamilies(const vector& cfs) { + int cfi = handles_.size(); + handles_.resize(cfi + cfs.size()); + for (auto cf : cfs) { + ASSERT_OK(db_->CreateColumnFamily(column_family_options_, cf, + &handles_[cfi++])); + } + } + + Status Put(int cf, const string& key, const string& value) { + return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value)); + } + Status Merge(int cf, const string& key, const string& value) { + return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value)); + } + + string Get(int cf, const string& key) { + ReadOptions options; + options.verify_checksums = true; + string result; + Status s = db_->Get(options, handles_[cf], Slice(key), &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + void CopyFile(const string& source, const string& destination, + uint64_t size = 0) { + const EnvOptions soptions; + unique_ptr srcfile; + ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions)); + unique_ptr destfile; + ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions)); + + if (size == 0) { + // default argument means copy everything + ASSERT_OK(env_->GetFileSize(source, &size)); + } + + char buffer[4096]; + Slice slice; + while (size > 0) { + uint64_t one = min(uint64_t(sizeof(buffer)), size); + ASSERT_OK(srcfile->Read(one, &slice, buffer)); + ASSERT_OK(destfile->Append(slice)); + size -= slice.size(); + } + ASSERT_OK(destfile->Close()); + } + + vector handles_; ColumnFamilyOptions column_family_options_; DBOptions db_options_; string dbname_; DB* db_; + Env* env_; }; TEST(ColumnFamilyTest, AddDrop) { @@ -74,6 +135,108 @@ TEST(ColumnFamilyTest, AddDrop) { ASSERT_TRUE(families == vector({"default", "four", "one", "three"})); } +TEST(ColumnFamilyTest, ReadWrite) { + ASSERT_OK(Open({"default"})); + CreateColumnFamilies({"one", "two"}); + Close(); + ASSERT_OK(Open({"default", "one", "two"})); + ASSERT_OK(Put(0, "foo", "v1")); + ASSERT_OK(Put(0, "bar", "v2")); + ASSERT_OK(Put(1, "mirko", "v3")); + ASSERT_OK(Put(0, "foo", "v2")); + ASSERT_OK(Put(2, "fodor", "v5")); + + for (int iter = 0; iter <= 3; ++iter) { + ASSERT_EQ("v2", Get(0, "foo")); + ASSERT_EQ("v2", Get(0, "bar")); + ASSERT_EQ("v3", Get(1, "mirko")); + ASSERT_EQ("v5", Get(2, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(0, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(1, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(2, "foo")); + if (iter <= 1) { + // reopen + Close(); + ASSERT_OK(Open({"default", "one", "two"})); + } + } + Close(); +} + +TEST(ColumnFamilyTest, IgnoreRecoveredLog) { + string backup_logs = dbname_ + "/backup_logs"; + + // delete old files in backup_logs directory + env_->CreateDirIfMissing(backup_logs); + vector old_files; + env_->GetChildren(backup_logs, &old_files); + for (auto& file : old_files) { + if (file != "." && file != "..") { + env_->DeleteFile(backup_logs + "/" + file); + } + } + + column_family_options_.merge_operator = + MergeOperators::CreateUInt64AddOperator(); + db_options_.wal_dir = dbname_ + "/logs"; + Destroy(); + ASSERT_OK(Open({"default"})); + CreateColumnFamilies({"cf1", "cf2"}); + + // fill up the DB + string one, two, three; + PutFixed64(&one, 1); + PutFixed64(&two, 2); + PutFixed64(&three, 3); + ASSERT_OK(Merge(0, "foo", one)); + ASSERT_OK(Merge(1, "mirko", one)); + ASSERT_OK(Merge(0, "foo", one)); + ASSERT_OK(Merge(2, "bla", one)); + ASSERT_OK(Merge(2, "fodor", one)); + ASSERT_OK(Merge(0, "bar", one)); + ASSERT_OK(Merge(2, "bla", one)); + ASSERT_OK(Merge(1, "mirko", two)); + ASSERT_OK(Merge(1, "franjo", one)); + + // copy the logs to backup + vector logs; + env_->GetChildren(db_options_.wal_dir, &logs); + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(db_options_.wal_dir + "/" + log, backup_logs + "/" + log); + } + } + + // recover the DB + Close(); + + // 1. check consistency + // 2. copy the logs from backup back to WAL dir. if the recovery happens + // again on the same log files, this should lead to incorrect results + // due to applying merge operator twice + // 3. check consistency + for (int iter = 0; iter < 2; ++iter) { + // assert consistency + ASSERT_OK(Open({"default", "cf1", "cf2"})); + ASSERT_EQ(two, Get(0, "foo")); + ASSERT_EQ(one, Get(0, "bar")); + ASSERT_EQ(three, Get(1, "mirko")); + ASSERT_EQ(one, Get(1, "franjo")); + ASSERT_EQ(one, Get(2, "fodor")); + ASSERT_EQ(two, Get(2, "bla")); + Close(); + + if (iter == 0) { + // copy the logs from backup back to wal dir + for (auto& log : logs) { + if (log != ".." && log != ".") { + CopyFile(backup_logs + "/" + log, db_options_.wal_dir + "/" + log); + } + } + } + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index d963212df5..f8f52a9b64 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -309,6 +310,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) versions_.reset(new VersionSet(dbname_, &options_, storage_options_, table_cache_.get(), &internal_comparator_)); + column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); dumpLeveldbBuildVersion(options_.info_log.get()); options_.Dump(options_.info_log.get()); @@ -494,7 +497,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // store the current filenum, lognum, etc deletion_state.manifest_file_number = versions_->ManifestFileNumber(); - deletion_state.log_number = versions_->LogNumber(); + deletion_state.log_number = versions_->MinLogNumber(); deletion_state.prev_log_number = versions_->PrevLogNumber(); if (!doing_the_full_scan && !deletion_state.HaveSomethingToDelete()) { @@ -860,7 +863,7 @@ Status DBImpl::Recover( // Note that PrevLogNumber() is no longer used, but we pay // attention to it in case we are recovering a database // produced by an older version of rocksdb. - const uint64_t min_log = versions_->LogNumber(); + const uint64_t min_log = versions_->MinLogNumber(); const uint64_t prev_log = versions_->PrevLogNumber(); std::vector filenames; s = env_->GetChildren(options_.wal_dir, &filenames); @@ -924,7 +927,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, mutex_.AssertHeld(); - VersionEdit edit; + std::unordered_map version_edits; + for (auto cfd : *versions_->GetColumnFamilySet()) { + VersionEdit edit; + edit.SetColumnFamily(cfd->id); + version_edits.insert({cfd->id, edit}); + } // Open the log file std::string fname = LogFileName(options_.wal_dir, log_number); @@ -955,7 +963,6 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, std::string scratch; Slice record; WriteBatch batch; - bool memtable_empty = true; while (reader.ReadRecord(&record, &scratch)) { if (record.size() < 12) { reporter.Corruption( @@ -964,9 +971,13 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, } WriteBatchInternal::SetContents(&batch, record); - status = - WriteBatchInternal::InsertInto(&batch, default_cfd_->mem, &options_); - memtable_empty = false; + // filter out all the column families that have already + // flushed memtables with log_number + column_family_memtables_->SetLogNumber(log_number); + status = WriteBatchInternal::InsertInto( + &batch, column_family_memtables_.get(), &options_); + column_family_memtables_->SetLogNumber(0); + MaybeIgnoreError(&status); if (!status.ok()) { return status; @@ -978,38 +989,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, *max_sequence = last_seq; } - if (!read_only && default_cfd_->mem->ApproximateMemoryUsage() > - options_.write_buffer_size) { - status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit); - // we still want to clear memtable, even if the recovery failed - default_cfd_->CreateNewMemtable(); - memtable_empty = true; - if (!status.ok()) { - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. - return status; + if (!read_only) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->mem->ApproximateMemoryUsage() > + cfd->options.write_buffer_size) { + auto iter = version_edits.find(cfd->id); + assert(iter != version_edits.end()); + VersionEdit* edit = &iter->second; + status = WriteLevel0TableForRecovery(cfd->mem, edit); + // we still want to clear the memtable, even if the recovery failed + cfd->CreateNewMemtable(); + if (!status.ok()) { + // Reflect errors immediately so that conditions like full + // file-systems cause the DB::Open() to fail. + return status; + } + } } } } - if (!memtable_empty && !read_only) { - status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit); - default_cfd_->CreateNewMemtable(); - if (!status.ok()) { - return status; - } - } + if (!read_only) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + auto iter = version_edits.find(cfd->id); + assert(iter != version_edits.end()); + VersionEdit* edit = &iter->second; - if (edit.NumEntries() > 0) { - // if read_only, NumEntries() will be 0 - assert(!read_only); - // writing log number in the manifest means that any log file - // with number strongly less than (log_number + 1) is already - // recovered and should be ignored on next reincarnation. - // Since we already recovered log_number, we want all logs - // with numbers `<= log_number` (includes this one) to be ignored - edit.SetLogNumber(log_number + 1); - status = versions_->LogAndApply(default_cfd_, &edit, &mutex_); + // flush the final memtable + status = WriteLevel0TableForRecovery(cfd->mem, edit); + // we still want to clear the memtable, even if the recovery failed + cfd->CreateNewMemtable(); + if (!status.ok()) { + return status; + } + + // write MANIFEST with update + // writing log number in the manifest means that any log file + // with number strongly less than (log_number + 1) is already + // recovered and should be ignored on next reincarnation. + // Since we already recovered log_number, we want all logs + // with numbers `<= log_number` (includes this one) to be ignored + edit->SetLogNumber(log_number + 1); + status = versions_->LogAndApply(cfd, edit, &mutex_); + if (!status.ok()) { + return status; + } + } } return status; @@ -2737,7 +2762,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { Status DBImpl::Get(const ReadOptions& options, const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) { - return GetImpl(options, key, value); + return GetImpl(options, column_family, key, value); } // DeletionState gets created and destructed outside of the lock -- we @@ -2784,12 +2809,19 @@ SuperVersion* DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, } Status DBImpl::GetImpl(const ReadOptions& options, - const Slice& key, - std::string* value, + const ColumnFamilyHandle& column_family, + const Slice& key, std::string* value, bool* value_found) { - Status s; - StopWatch sw(env_, options_.statistics.get(), DB_GET, false); + + mutex_.Lock(); + auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id); + // this is asserting because client calling Get() with undefined + // ColumnFamilyHandle is undefined behavior. + assert(cfd != nullptr); + SuperVersion* get_version = cfd->super_version->Ref(); + mutex_.Unlock(); + SequenceNumber snapshot; if (options.snapshot != nullptr) { snapshot = reinterpret_cast(options.snapshot)->number_; @@ -2797,17 +2829,13 @@ Status DBImpl::GetImpl(const ReadOptions& options, snapshot = versions_->LastSequence(); } - // This can be replaced by using atomics and spinlock instead of big mutex - mutex_.Lock(); - SuperVersion* get_version = default_cfd_->super_version->Ref(); - mutex_.Unlock(); - bool have_stat_update = false; Version::GetStats stats; // Prepare to store a list of merge operations if merge occurs. MergeContext merge_context; + Status s; // First look in the memtable, then in the immutable memtable (if any). // s is both in/out. When in, s could either be OK or MergeInProgress. // merge_operands will contain the sequence of merges in the latter case. @@ -2957,6 +2985,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, // add to internal data structures versions_->CreateColumnFamily(options, &edit); } + Log(options_.info_log, "Created column family %s\n", + column_family_name.c_str()); return s; } @@ -2976,6 +3006,9 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { // remove from internal data structures versions_->DropColumnFamily(&edit); } + // TODO(icanadi) PurgeObsoletetFiles here + Log(options_.info_log, "Dropped column family with id %u\n", + column_family.id); return s; } @@ -2989,7 +3022,7 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, } ReadOptions roptions = options; roptions.read_tier = kBlockCacheTier; // read from block cache only - auto s = GetImpl(roptions, key, value, value_found); + auto s = GetImpl(roptions, column_family, key, value, value_found); // If options.block_cache != nullptr and the index block of the table didn't // not present in block_cache, the return value will be Status::Incomplete. @@ -3102,7 +3135,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes - // into default_cfd_->mem. + // into memtables { mutex_.Unlock(); WriteBatch* updates = nullptr; @@ -3148,9 +3181,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, default_cfd_->mem, - &options_, this, - options_.filter_deletes); + // TODO(icanadi) this accesses column_family_set_ without any lock. + // We'll need to add a spinlock for reading that we also lock when we + // write to a column family (only on column family add/drop, which is + // a very rare action) + status = WriteBatchInternal::InsertInto( + updates, column_family_memtables_.get(), &options_, this, + options_.filter_deletes); + if (!status.ok()) { // Panic for in-memory corruptions // Note that existing logic was not sound. Any partial failure writing @@ -3995,9 +4033,12 @@ Status DB::OpenWithColumnFamilies( if (s.ok()) { lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size); VersionEdit edit; - edit.SetLogNumber(new_log_number); impl->logfile_number_ = new_log_number; impl->log_.reset(new log::Writer(std::move(lfile))); + // We use this LogAndApply just to store the next file number, the one + // that we used by calling impl->versions_->NewFileNumber() + // The used log number are already written to manifest in RecoverLogFile() + // method s = impl->versions_->LogAndApply(impl->default_cfd_, &edit, &impl->mutex_, impl->db_directory_.get()); } diff --git a/db/db_impl.h b/db/db_impl.h index f16d4a65c3..1355fa4987 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -399,6 +399,7 @@ class DBImpl : public DB { uint64_t logfile_number_; unique_ptr log_; ColumnFamilyData* default_cfd_; + unique_ptr column_family_memtables_; // An ordinal representing the current SuperVersion. Updated by // InstallSuperVersion(), i.e. incremented every time super_version_ @@ -603,9 +604,8 @@ class DBImpl : public DB { // Function that Get and KeyMayExist call with no_io true or false // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, - const Slice& key, - std::string* value, - bool* value_found = nullptr); + const ColumnFamilyHandle& column_family, const Slice& key, + std::string* value, bool* value_found = nullptr); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/version_set.cc b/db/version_set.cc index 0a717de3af..e7e479d73c 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1567,6 +1567,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, manifest_file_size_ = new_manifest_file_size; AppendVersion(column_family_data, v); log_number_ = edit->log_number_; + column_family_data->log_number = edit->log_number_; prev_log_number_ = edit->prev_log_number_; } else { @@ -1753,6 +1754,10 @@ Status VersionSet::Recover( break; } + if (edit.has_log_number_) { + cfd->log_number = edit.log_number_; + } + // if it is not column family add or column family drop, // then it's a file add/delete, which should be forwarded // to builder @@ -1838,6 +1843,11 @@ Status VersionSet::Recover( (unsigned long)last_sequence_, (unsigned long)log_number_, (unsigned long)prev_log_number_); + + for (auto cfd : *column_family_set_) { + Log(options_->info_log, "Column family \"%s\", log number is %lu\n", + cfd->name.c_str(), cfd->log_number); + } } for (auto builder : builders) { @@ -2140,6 +2150,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { f->largest_seqno); } } + edit.SetLogNumber(cfd->log_number); std::string record; edit.EncodeTo(&record); Status s = log->AddRecord(record); diff --git a/db/version_set.h b/db/version_set.h index 1cda55ca15..57ea509a4d 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -349,13 +349,26 @@ class VersionSet { // Mark the specified file number as used. void MarkFileNumberUsed(uint64_t number); - // Return the current log file number. + // Return the current log file number. This is the biggest log_number from + // all column families uint64_t LogNumber() const { return log_number_; } // Return the log file number for the log file that is currently // being compacted, or zero if there is no such log file. uint64_t PrevLogNumber() const { return prev_log_number_; } + // Returns the minimum log number such that all + // log numbers less than or equal to it can be deleted + uint64_t MinLogNumber() const { + uint64_t min_log_num = 0; + for (auto cfd : *column_family_set_) { + if (min_log_num == 0 || min_log_num > cfd->log_number) { + min_log_num = cfd->log_number; + } + } + return min_log_num; + } + int NumberLevels() const { return num_levels_; } // Pick level and inputs for a new compaction. @@ -433,7 +446,7 @@ class VersionSet { friend class Compaction; friend class Version; - // TODO temporarily until we have what ColumnFamilyData needs (icmp_) + // TODO(icanadi) temporarily until we have what ColumnFamilyData needs (icmp_) friend struct ColumnFamilyData; struct LogReporter : public log::Reader::Reporter { diff --git a/db/write_batch.cc b/db/write_batch.cc index af4790ce5a..c6f096476d 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -230,17 +230,19 @@ class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; MemTable* mem_; + ColumnFamilyMemTables* cf_mems_; const Options* options_; DBImpl* db_; const bool filter_deletes_; MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts, DB* db, const bool filter_deletes) - : sequence_(sequence), - mem_(mem), - options_(opts), - db_(reinterpret_cast(db)), - filter_deletes_(filter_deletes) { + : sequence_(sequence), + mem_(mem), + cf_mems_(nullptr), + options_(opts), + db_(reinterpret_cast(db)), + filter_deletes_(filter_deletes) { assert(mem_); if (filter_deletes_) { assert(options_); @@ -248,18 +250,50 @@ class MemTableInserter : public WriteBatch::Handler { } } + MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, + const Options* opts, DB* db, const bool filter_deletes) + : sequence_(sequence), + mem_(nullptr), + cf_mems_(cf_mems), + options_(opts), + db_(reinterpret_cast(db)), + filter_deletes_(filter_deletes) { + assert(cf_mems); + if (filter_deletes_) { + assert(options_); + assert(db_); + } + } + + // returns nullptr if the update to the column family is not needed + MemTable* GetMemTable(uint32_t column_family_id) { + if (mem_ != nullptr) { + return (column_family_id == 0) ? mem_ : nullptr; + } else { + return cf_mems_->GetMemTable(column_family_id); + } + } + virtual void PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { - if (options_->inplace_update_support - && mem_->Update(sequence_, kTypeValue, key, value)) { + MemTable* mem = GetMemTable(column_family_id); + if (mem == nullptr) { + return; + } + if (options_->inplace_update_support && + mem->Update(sequence_, kTypeValue, key, value)) { RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); } else { - mem_->Add(sequence_, kTypeValue, key, value); + mem->Add(sequence_, kTypeValue, key, value); } sequence_++; } virtual void MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { + MemTable* mem = GetMemTable(column_family_id); + if (mem == nullptr) { + return; + } bool perform_merge = false; if (options_->max_successive_merges > 0 && db_ != nullptr) { @@ -267,7 +301,7 @@ class MemTableInserter : public WriteBatch::Handler { // Count the number of successive merges at the head // of the key in the memtable - size_t num_merges = mem_->CountSuccessiveMergeEntries(lkey); + size_t num_merges = mem->CountSuccessiveMergeEntries(lkey); if (num_merges >= options_->max_successive_merges) { perform_merge = true; @@ -307,18 +341,22 @@ class MemTableInserter : public WriteBatch::Handler { perform_merge = false; } else { // 3) Add value to memtable - mem_->Add(sequence_, kTypeValue, key, new_value); + mem->Add(sequence_, kTypeValue, key, new_value); } } if (!perform_merge) { // Add merge operator to memtable - mem_->Add(sequence_, kTypeMerge, key, value); + mem->Add(sequence_, kTypeMerge, key, value); } sequence_++; } virtual void DeleteCF(uint32_t column_family_id, const Slice& key) { + MemTable* mem = GetMemTable(column_family_id); + if (mem == nullptr) { + return; + } if (filter_deletes_) { SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; @@ -330,7 +368,7 @@ class MemTableInserter : public WriteBatch::Handler { return; } } - mem_->Add(sequence_, kTypeDeletion, key, Slice()); + mem->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; } }; @@ -344,6 +382,15 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem, return b->Iterate(&inserter); } +Status WriteBatchInternal::InsertInto(const WriteBatch* b, + ColumnFamilyMemTables* memtables, + const Options* opts, DB* db, + const bool filter_deletes) { + MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, opts, + db, filter_deletes); + return b->Iterate(&inserter); +} + void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index b8991732f9..244799fc30 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -17,6 +17,11 @@ namespace rocksdb { class MemTable; +class ColumnFamilyMemTables { + public: + virtual MemTable* GetMemTable(uint32_t column_family_id) = 0; +}; + // WriteBatchInternal provides static methods for manipulating a // WriteBatch that we don't want in the public WriteBatch interface. class WriteBatchInternal { @@ -51,6 +56,11 @@ class WriteBatchInternal { const Options* opts, DB* db = nullptr, const bool filter_del = false); + static Status InsertInto(const WriteBatch* batch, + ColumnFamilyMemTables* memtables, + const Options* opts, DB* db = nullptr, + const bool filter_del = false); + static void Append(WriteBatch* dst, const WriteBatch* src); };