Read from and write to different column families

Summary: This one is big. It adds ability to write to and read from different column families (see the unit test). It also supports recovery of different column families from log, which was the hardest part to reason about. We need to make sure to never delete the log file which has unflushed data from any column family. To support that, I added another concept, which is versions_->MinLogNumber()

Test Plan: Added a unit test in column_family_test

Reviewers: dhruba, haobo, sdong, kailiu

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15537
This commit is contained in:
Igor Canadi 2014-01-28 11:05:04 -08:00
parent c1071ed95c
commit f24a3ee52d
9 changed files with 400 additions and 74 deletions

View File

@ -69,7 +69,8 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
options(options),
mem(nullptr),
imm(options.min_write_buffer_number_to_merge),
super_version(nullptr) {}
super_version(nullptr),
log_number(0) {}
ColumnFamilyData::~ColumnFamilyData() {
if (super_version != nullptr) {
@ -167,4 +168,18 @@ void ColumnFamilySet::DropColumnFamily(uint32_t id) {
column_family_data_.erase(cfd);
}
MemTable* ColumnFamilyMemTablesImpl::GetMemTable(uint32_t column_family_id) {
auto cfd = column_family_set_->GetColumnFamily(column_family_id);
// TODO(icanadi): this should not be asserting. Rather, it should somehow
// return Corruption status back to the Iterator. This will require
// API change in WriteBatch::Handler, which is a public API
assert(cfd != nullptr);
if (log_number_ == 0 || log_number_ >= cfd->log_number) {
return cfd->mem;
} else {
return nullptr;
}
}
} // namespace rocksdb

View File

@ -15,6 +15,7 @@
#include "rocksdb/options.h"
#include "db/memtablelist.h"
#include "db/write_batch_internal.h"
namespace rocksdb {
@ -63,6 +64,11 @@ struct ColumnFamilyData {
MemTableList imm;
SuperVersion* super_version;
// This is the earliest log file number that contains data from this
// Column Family. All earlier log files must be ignored and not
// recovered from
uint64_t log_number;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, const ColumnFamilyOptions& options);
~ColumnFamilyData();
@ -122,4 +128,24 @@ class ColumnFamilySet {
uint32_t max_column_family_;
};
class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables {
public:
explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set)
: column_family_set_(column_family_set), log_number_(0) {}
// If column_family_data->log_number is bigger than log_number,
// the memtable will not be returned.
// If log_number == 0, the memtable will be always returned
void SetLogNumber(uint64_t log_number) { log_number_ = log_number; }
// Returns the column families memtable if log_number == 0 || log_number <=
// column_family_data->log_number.
// If column family doesn't exist, it asserts
virtual MemTable* GetMemTable(uint32_t column_family_id) override;
private:
ColumnFamilySet* column_family_set_;
uint64_t log_number_;
};
} // namespace rocksdb

View File

@ -8,8 +8,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl.h"
#include "rocksdb/env.h"
#include "rocksdb/db.h"
#include "util/testharness.h"
#include "utilities/merge_operators.h"
#include <algorithm>
#include <vector>
@ -22,10 +24,10 @@ using namespace std;
class ColumnFamilyTest {
public:
ColumnFamilyTest() {
env_ = Env::Default();
dbname_ = test::TmpDir() + "/column_family_test";
db_options_.create_if_missing = true;
options_.create_if_missing = true;
DestroyDB(dbname_, options_);
DestroyDB(dbname_, Options(db_options_, column_family_options_));
}
void Close() {
@ -37,18 +39,77 @@ class ColumnFamilyTest {
vector<ColumnFamilyDescriptor> column_families;
for (auto x : cf) {
column_families.push_back(
ColumnFamilyDescriptor(x, ColumnFamilyOptions()));
ColumnFamilyDescriptor(x, column_family_options_));
}
vector <ColumnFamilyHandle> handles;
return DB::OpenWithColumnFamilies(db_options_, dbname_, column_families,
&handles, &db_);
&handles_, &db_);
}
Options options_;
void Destroy() {
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_)));
}
void CreateColumnFamilies(const vector<string>& cfs) {
int cfi = handles_.size();
handles_.resize(cfi + cfs.size());
for (auto cf : cfs) {
ASSERT_OK(db_->CreateColumnFamily(column_family_options_, cf,
&handles_[cfi++]));
}
}
Status Put(int cf, const string& key, const string& value) {
return db_->Put(WriteOptions(), handles_[cf], Slice(key), Slice(value));
}
Status Merge(int cf, const string& key, const string& value) {
return db_->Merge(WriteOptions(), handles_[cf], Slice(key), Slice(value));
}
string Get(int cf, const string& key) {
ReadOptions options;
options.verify_checksums = true;
string result;
Status s = db_->Get(options, handles_[cf], Slice(key), &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
void CopyFile(const string& source, const string& destination,
uint64_t size = 0) {
const EnvOptions soptions;
unique_ptr<SequentialFile> srcfile;
ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
unique_ptr<WritableFile> destfile;
ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));
if (size == 0) {
// default argument means copy everything
ASSERT_OK(env_->GetFileSize(source, &size));
}
char buffer[4096];
Slice slice;
while (size > 0) {
uint64_t one = min(uint64_t(sizeof(buffer)), size);
ASSERT_OK(srcfile->Read(one, &slice, buffer));
ASSERT_OK(destfile->Append(slice));
size -= slice.size();
}
ASSERT_OK(destfile->Close());
}
vector<ColumnFamilyHandle> handles_;
ColumnFamilyOptions column_family_options_;
DBOptions db_options_;
string dbname_;
DB* db_;
Env* env_;
};
TEST(ColumnFamilyTest, AddDrop) {
@ -74,6 +135,108 @@ TEST(ColumnFamilyTest, AddDrop) {
ASSERT_TRUE(families == vector<string>({"default", "four", "one", "three"}));
}
TEST(ColumnFamilyTest, ReadWrite) {
ASSERT_OK(Open({"default"}));
CreateColumnFamilies({"one", "two"});
Close();
ASSERT_OK(Open({"default", "one", "two"}));
ASSERT_OK(Put(0, "foo", "v1"));
ASSERT_OK(Put(0, "bar", "v2"));
ASSERT_OK(Put(1, "mirko", "v3"));
ASSERT_OK(Put(0, "foo", "v2"));
ASSERT_OK(Put(2, "fodor", "v5"));
for (int iter = 0; iter <= 3; ++iter) {
ASSERT_EQ("v2", Get(0, "foo"));
ASSERT_EQ("v2", Get(0, "bar"));
ASSERT_EQ("v3", Get(1, "mirko"));
ASSERT_EQ("v5", Get(2, "fodor"));
ASSERT_EQ("NOT_FOUND", Get(0, "fodor"));
ASSERT_EQ("NOT_FOUND", Get(1, "fodor"));
ASSERT_EQ("NOT_FOUND", Get(2, "foo"));
if (iter <= 1) {
// reopen
Close();
ASSERT_OK(Open({"default", "one", "two"}));
}
}
Close();
}
TEST(ColumnFamilyTest, IgnoreRecoveredLog) {
string backup_logs = dbname_ + "/backup_logs";
// delete old files in backup_logs directory
env_->CreateDirIfMissing(backup_logs);
vector<string> old_files;
env_->GetChildren(backup_logs, &old_files);
for (auto& file : old_files) {
if (file != "." && file != "..") {
env_->DeleteFile(backup_logs + "/" + file);
}
}
column_family_options_.merge_operator =
MergeOperators::CreateUInt64AddOperator();
db_options_.wal_dir = dbname_ + "/logs";
Destroy();
ASSERT_OK(Open({"default"}));
CreateColumnFamilies({"cf1", "cf2"});
// fill up the DB
string one, two, three;
PutFixed64(&one, 1);
PutFixed64(&two, 2);
PutFixed64(&three, 3);
ASSERT_OK(Merge(0, "foo", one));
ASSERT_OK(Merge(1, "mirko", one));
ASSERT_OK(Merge(0, "foo", one));
ASSERT_OK(Merge(2, "bla", one));
ASSERT_OK(Merge(2, "fodor", one));
ASSERT_OK(Merge(0, "bar", one));
ASSERT_OK(Merge(2, "bla", one));
ASSERT_OK(Merge(1, "mirko", two));
ASSERT_OK(Merge(1, "franjo", one));
// copy the logs to backup
vector<string> logs;
env_->GetChildren(db_options_.wal_dir, &logs);
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(db_options_.wal_dir + "/" + log, backup_logs + "/" + log);
}
}
// recover the DB
Close();
// 1. check consistency
// 2. copy the logs from backup back to WAL dir. if the recovery happens
// again on the same log files, this should lead to incorrect results
// due to applying merge operator twice
// 3. check consistency
for (int iter = 0; iter < 2; ++iter) {
// assert consistency
ASSERT_OK(Open({"default", "cf1", "cf2"}));
ASSERT_EQ(two, Get(0, "foo"));
ASSERT_EQ(one, Get(0, "bar"));
ASSERT_EQ(three, Get(1, "mirko"));
ASSERT_EQ(one, Get(1, "franjo"));
ASSERT_EQ(one, Get(2, "fodor"));
ASSERT_EQ(two, Get(2, "bla"));
Close();
if (iter == 0) {
// copy the logs from backup back to wal dir
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(backup_logs + "/" + log, db_options_.wal_dir + "/" + log);
}
}
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -17,6 +17,7 @@
#include <stdint.h>
#include <string>
#include <unordered_set>
#include <unordered_map>
#include <utility>
#include <vector>
@ -309,6 +310,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
versions_.reset(new VersionSet(dbname_, &options_, storage_options_,
table_cache_.get(), &internal_comparator_));
column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
dumpLeveldbBuildVersion(options_.info_log.get());
options_.Dump(options_.info_log.get());
@ -494,7 +497,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// store the current filenum, lognum, etc
deletion_state.manifest_file_number = versions_->ManifestFileNumber();
deletion_state.log_number = versions_->LogNumber();
deletion_state.log_number = versions_->MinLogNumber();
deletion_state.prev_log_number = versions_->PrevLogNumber();
if (!doing_the_full_scan && !deletion_state.HaveSomethingToDelete()) {
@ -860,7 +863,7 @@ Status DBImpl::Recover(
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of rocksdb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t min_log = versions_->MinLogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(options_.wal_dir, &filenames);
@ -924,7 +927,12 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
mutex_.AssertHeld();
VersionEdit edit;
std::unordered_map<int, VersionEdit> version_edits;
for (auto cfd : *versions_->GetColumnFamilySet()) {
VersionEdit edit;
edit.SetColumnFamily(cfd->id);
version_edits.insert({cfd->id, edit});
}
// Open the log file
std::string fname = LogFileName(options_.wal_dir, log_number);
@ -955,7 +963,6 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
std::string scratch;
Slice record;
WriteBatch batch;
bool memtable_empty = true;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(
@ -964,9 +971,13 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
}
WriteBatchInternal::SetContents(&batch, record);
status =
WriteBatchInternal::InsertInto(&batch, default_cfd_->mem, &options_);
memtable_empty = false;
// filter out all the column families that have already
// flushed memtables with log_number
column_family_memtables_->SetLogNumber(log_number);
status = WriteBatchInternal::InsertInto(
&batch, column_family_memtables_.get(), &options_);
column_family_memtables_->SetLogNumber(0);
MaybeIgnoreError(&status);
if (!status.ok()) {
return status;
@ -978,38 +989,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
*max_sequence = last_seq;
}
if (!read_only && default_cfd_->mem->ApproximateMemoryUsage() >
options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit);
// we still want to clear memtable, even if the recovery failed
default_cfd_->CreateNewMemtable();
memtable_empty = true;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
return status;
if (!read_only) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem->ApproximateMemoryUsage() >
cfd->options.write_buffer_size) {
auto iter = version_edits.find(cfd->id);
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(cfd->mem, edit);
// we still want to clear the memtable, even if the recovery failed
cfd->CreateNewMemtable();
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
return status;
}
}
}
}
}
if (!memtable_empty && !read_only) {
status = WriteLevel0TableForRecovery(default_cfd_->mem, &edit);
default_cfd_->CreateNewMemtable();
if (!status.ok()) {
return status;
}
}
if (!read_only) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto iter = version_edits.find(cfd->id);
assert(iter != version_edits.end());
VersionEdit* edit = &iter->second;
if (edit.NumEntries() > 0) {
// if read_only, NumEntries() will be 0
assert(!read_only);
// writing log number in the manifest means that any log file
// with number strongly less than (log_number + 1) is already
// recovered and should be ignored on next reincarnation.
// Since we already recovered log_number, we want all logs
// with numbers `<= log_number` (includes this one) to be ignored
edit.SetLogNumber(log_number + 1);
status = versions_->LogAndApply(default_cfd_, &edit, &mutex_);
// flush the final memtable
status = WriteLevel0TableForRecovery(cfd->mem, edit);
// we still want to clear the memtable, even if the recovery failed
cfd->CreateNewMemtable();
if (!status.ok()) {
return status;
}
// write MANIFEST with update
// writing log number in the manifest means that any log file
// with number strongly less than (log_number + 1) is already
// recovered and should be ignored on next reincarnation.
// Since we already recovered log_number, we want all logs
// with numbers `<= log_number` (includes this one) to be ignored
edit->SetLogNumber(log_number + 1);
status = versions_->LogAndApply(cfd, edit, &mutex_);
if (!status.ok()) {
return status;
}
}
}
return status;
@ -2737,7 +2762,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
Status DBImpl::Get(const ReadOptions& options,
const ColumnFamilyHandle& column_family, const Slice& key,
std::string* value) {
return GetImpl(options, key, value);
return GetImpl(options, column_family, key, value);
}
// DeletionState gets created and destructed outside of the lock -- we
@ -2784,12 +2809,19 @@ SuperVersion* DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
}
Status DBImpl::GetImpl(const ReadOptions& options,
const Slice& key,
std::string* value,
const ColumnFamilyHandle& column_family,
const Slice& key, std::string* value,
bool* value_found) {
Status s;
StopWatch sw(env_, options_.statistics.get(), DB_GET, false);
mutex_.Lock();
auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family.id);
// this is asserting because client calling Get() with undefined
// ColumnFamilyHandle is undefined behavior.
assert(cfd != nullptr);
SuperVersion* get_version = cfd->super_version->Ref();
mutex_.Unlock();
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
@ -2797,17 +2829,13 @@ Status DBImpl::GetImpl(const ReadOptions& options,
snapshot = versions_->LastSequence();
}
// This can be replaced by using atomics and spinlock instead of big mutex
mutex_.Lock();
SuperVersion* get_version = default_cfd_->super_version->Ref();
mutex_.Unlock();
bool have_stat_update = false;
Version::GetStats stats;
// Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context;
Status s;
// First look in the memtable, then in the immutable memtable (if any).
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
@ -2957,6 +2985,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
// add to internal data structures
versions_->CreateColumnFamily(options, &edit);
}
Log(options_.info_log, "Created column family %s\n",
column_family_name.c_str());
return s;
}
@ -2976,6 +3006,9 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) {
// remove from internal data structures
versions_->DropColumnFamily(&edit);
}
// TODO(icanadi) PurgeObsoletetFiles here
Log(options_.info_log, "Dropped column family with id %u\n",
column_family.id);
return s;
}
@ -2989,7 +3022,7 @@ bool DBImpl::KeyMayExist(const ReadOptions& options,
}
ReadOptions roptions = options;
roptions.read_tier = kBlockCacheTier; // read from block cache only
auto s = GetImpl(roptions, key, value, value_found);
auto s = GetImpl(roptions, column_family, key, value, value_found);
// If options.block_cache != nullptr and the index block of the table didn't
// not present in block_cache, the return value will be Status::Incomplete.
@ -3102,7 +3135,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into default_cfd_->mem.
// into memtables
{
mutex_.Unlock();
WriteBatch* updates = nullptr;
@ -3148,9 +3181,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, default_cfd_->mem,
&options_, this,
options_.filter_deletes);
// TODO(icanadi) this accesses column_family_set_ without any lock.
// We'll need to add a spinlock for reading that we also lock when we
// write to a column family (only on column family add/drop, which is
// a very rare action)
status = WriteBatchInternal::InsertInto(
updates, column_family_memtables_.get(), &options_, this,
options_.filter_deletes);
if (!status.ok()) {
// Panic for in-memory corruptions
// Note that existing logic was not sound. Any partial failure writing
@ -3995,9 +4033,12 @@ Status DB::OpenWithColumnFamilies(
if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size);
VersionEdit edit;
edit.SetLogNumber(new_log_number);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));
// We use this LogAndApply just to store the next file number, the one
// that we used by calling impl->versions_->NewFileNumber()
// The used log number are already written to manifest in RecoverLogFile()
// method
s = impl->versions_->LogAndApply(impl->default_cfd_, &edit, &impl->mutex_,
impl->db_directory_.get());
}

View File

@ -399,6 +399,7 @@ class DBImpl : public DB {
uint64_t logfile_number_;
unique_ptr<log::Writer> log_;
ColumnFamilyData* default_cfd_;
unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
// An ordinal representing the current SuperVersion. Updated by
// InstallSuperVersion(), i.e. incremented every time super_version_
@ -603,9 +604,8 @@ class DBImpl : public DB {
// Function that Get and KeyMayExist call with no_io true or false
// Note: 'value_found' from KeyMayExist propagates here
Status GetImpl(const ReadOptions& options,
const Slice& key,
std::string* value,
bool* value_found = nullptr);
const ColumnFamilyHandle& column_family, const Slice& key,
std::string* value, bool* value_found = nullptr);
};
// Sanitize db options. The caller should delete result.info_log if

View File

@ -1567,6 +1567,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
manifest_file_size_ = new_manifest_file_size;
AppendVersion(column_family_data, v);
log_number_ = edit->log_number_;
column_family_data->log_number = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
@ -1753,6 +1754,10 @@ Status VersionSet::Recover(
break;
}
if (edit.has_log_number_) {
cfd->log_number = edit.log_number_;
}
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
// to builder
@ -1838,6 +1843,11 @@ Status VersionSet::Recover(
(unsigned long)last_sequence_,
(unsigned long)log_number_,
(unsigned long)prev_log_number_);
for (auto cfd : *column_family_set_) {
Log(options_->info_log, "Column family \"%s\", log number is %lu\n",
cfd->name.c_str(), cfd->log_number);
}
}
for (auto builder : builders) {
@ -2140,6 +2150,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
f->largest_seqno);
}
}
edit.SetLogNumber(cfd->log_number);
std::string record;
edit.EncodeTo(&record);
Status s = log->AddRecord(record);

View File

@ -349,13 +349,26 @@ class VersionSet {
// Mark the specified file number as used.
void MarkFileNumberUsed(uint64_t number);
// Return the current log file number.
// Return the current log file number. This is the biggest log_number from
// all column families
uint64_t LogNumber() const { return log_number_; }
// Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file.
uint64_t PrevLogNumber() const { return prev_log_number_; }
// Returns the minimum log number such that all
// log numbers less than or equal to it can be deleted
uint64_t MinLogNumber() const {
uint64_t min_log_num = 0;
for (auto cfd : *column_family_set_) {
if (min_log_num == 0 || min_log_num > cfd->log_number) {
min_log_num = cfd->log_number;
}
}
return min_log_num;
}
int NumberLevels() const { return num_levels_; }
// Pick level and inputs for a new compaction.
@ -433,7 +446,7 @@ class VersionSet {
friend class Compaction;
friend class Version;
// TODO temporarily until we have what ColumnFamilyData needs (icmp_)
// TODO(icanadi) temporarily until we have what ColumnFamilyData needs (icmp_)
friend struct ColumnFamilyData;
struct LogReporter : public log::Reader::Reporter {

View File

@ -230,17 +230,19 @@ class MemTableInserter : public WriteBatch::Handler {
public:
SequenceNumber sequence_;
MemTable* mem_;
ColumnFamilyMemTables* cf_mems_;
const Options* options_;
DBImpl* db_;
const bool filter_deletes_;
MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts,
DB* db, const bool filter_deletes)
: sequence_(sequence),
mem_(mem),
options_(opts),
db_(reinterpret_cast<DBImpl*>(db)),
filter_deletes_(filter_deletes) {
: sequence_(sequence),
mem_(mem),
cf_mems_(nullptr),
options_(opts),
db_(reinterpret_cast<DBImpl*>(db)),
filter_deletes_(filter_deletes) {
assert(mem_);
if (filter_deletes_) {
assert(options_);
@ -248,18 +250,50 @@ class MemTableInserter : public WriteBatch::Handler {
}
}
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
const Options* opts, DB* db, const bool filter_deletes)
: sequence_(sequence),
mem_(nullptr),
cf_mems_(cf_mems),
options_(opts),
db_(reinterpret_cast<DBImpl*>(db)),
filter_deletes_(filter_deletes) {
assert(cf_mems);
if (filter_deletes_) {
assert(options_);
assert(db_);
}
}
// returns nullptr if the update to the column family is not needed
MemTable* GetMemTable(uint32_t column_family_id) {
if (mem_ != nullptr) {
return (column_family_id == 0) ? mem_ : nullptr;
} else {
return cf_mems_->GetMemTable(column_family_id);
}
}
virtual void PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
if (options_->inplace_update_support
&& mem_->Update(sequence_, kTypeValue, key, value)) {
MemTable* mem = GetMemTable(column_family_id);
if (mem == nullptr) {
return;
}
if (options_->inplace_update_support &&
mem->Update(sequence_, kTypeValue, key, value)) {
RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED);
} else {
mem_->Add(sequence_, kTypeValue, key, value);
mem->Add(sequence_, kTypeValue, key, value);
}
sequence_++;
}
virtual void MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
MemTable* mem = GetMemTable(column_family_id);
if (mem == nullptr) {
return;
}
bool perform_merge = false;
if (options_->max_successive_merges > 0 && db_ != nullptr) {
@ -267,7 +301,7 @@ class MemTableInserter : public WriteBatch::Handler {
// Count the number of successive merges at the head
// of the key in the memtable
size_t num_merges = mem_->CountSuccessiveMergeEntries(lkey);
size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
if (num_merges >= options_->max_successive_merges) {
perform_merge = true;
@ -307,18 +341,22 @@ class MemTableInserter : public WriteBatch::Handler {
perform_merge = false;
} else {
// 3) Add value to memtable
mem_->Add(sequence_, kTypeValue, key, new_value);
mem->Add(sequence_, kTypeValue, key, new_value);
}
}
if (!perform_merge) {
// Add merge operator to memtable
mem_->Add(sequence_, kTypeMerge, key, value);
mem->Add(sequence_, kTypeMerge, key, value);
}
sequence_++;
}
virtual void DeleteCF(uint32_t column_family_id, const Slice& key) {
MemTable* mem = GetMemTable(column_family_id);
if (mem == nullptr) {
return;
}
if (filter_deletes_) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
@ -330,7 +368,7 @@ class MemTableInserter : public WriteBatch::Handler {
return;
}
}
mem_->Add(sequence_, kTypeDeletion, key, Slice());
mem->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
}
};
@ -344,6 +382,15 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem,
return b->Iterate(&inserter);
}
Status WriteBatchInternal::InsertInto(const WriteBatch* b,
ColumnFamilyMemTables* memtables,
const Options* opts, DB* db,
const bool filter_deletes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(b), memtables, opts,
db, filter_deletes);
return b->Iterate(&inserter);
}
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size());

View File

@ -17,6 +17,11 @@ namespace rocksdb {
class MemTable;
class ColumnFamilyMemTables {
public:
virtual MemTable* GetMemTable(uint32_t column_family_id) = 0;
};
// WriteBatchInternal provides static methods for manipulating a
// WriteBatch that we don't want in the public WriteBatch interface.
class WriteBatchInternal {
@ -51,6 +56,11 @@ class WriteBatchInternal {
const Options* opts, DB* db = nullptr,
const bool filter_del = false);
static Status InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables,
const Options* opts, DB* db = nullptr,
const bool filter_del = false);
static void Append(WriteBatch* dst, const WriteBatch* src);
};