Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	db/db_impl.h
	db/db_impl_readonly.cc
	db/version_set.cc
This commit is contained in:
Igor Canadi 2014-01-22 10:59:07 -08:00
commit 92a022ad07
8 changed files with 173 additions and 80 deletions

View File

@ -26,7 +26,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
: level_(level),
out_level_(out_level),
max_output_file_size_(target_file_size),
maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes),
max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes),
input_version_(input_version),
number_levels_(input_version_->NumberLevels()),
seek_compaction_(seek_compaction),
@ -64,7 +64,7 @@ bool Compaction::IsTrivialMove() const {
return (level_ != out_level_ &&
num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= maxGrandParentOverlapBytes_);
TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
}
void Compaction::AddInputDeletions(VersionEdit* edit) {
@ -117,7 +117,7 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) {
}
seen_key_ = true;
if (overlapped_bytes_ > maxGrandParentOverlapBytes_) {
if (overlapped_bytes_ > max_grandparent_overlap_bytes_) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;

View File

@ -33,9 +33,14 @@ class Compaction {
// "which" must be either 0 or 1
int num_input_files(int which) const { return inputs_[which].size(); }
// Returns input version of the compaction
Version* input_version() const { return input_version_; }
// Return the ith input file at "level()+which" ("which" must be 0 or 1).
FileMetaData* input(int which, int i) const { return inputs_[which][i]; }
std::vector<FileMetaData*>* inputs(int which) { return &inputs_[which]; }
// Maximum size of files to build during this compaction.
uint64_t MaxOutputFileSize() const { return max_output_file_size_; }
@ -74,8 +79,6 @@ class Compaction {
bool IsFullCompaction() { return is_full_compaction_; }
private:
friend class Version;
friend class VersionSet;
friend class CompactionPicker;
friend class UniversalCompactionPicker;
friend class LevelCompactionPicker;
@ -87,7 +90,7 @@ class Compaction {
int level_;
int out_level_; // levels to which output files are stored
uint64_t max_output_file_size_;
uint64_t maxGrandParentOverlapBytes_;
uint64_t max_grandparent_overlap_bytes_;
Version* input_version_;
VersionEdit* edit_;
int number_levels_;

View File

@ -863,16 +863,13 @@ void DBImpl::PurgeObsoleteWALFiles() {
}
}
// If externalTable is set, then apply recovered transactions
// to that table. This is used for readonly mode.
Status DBImpl::Recover(
VersionEdit* edit,
const std::vector<ColumnFamilyDescriptor>& column_families,
MemTable* external_table, bool error_if_log_file_exist) {
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_log_file_exist) {
mutex_.AssertHeld();
assert(db_lock_ == nullptr);
if (!external_table) {
if (!read_only) {
// We call CreateDirIfMissing() as the directory may already exist (if we
// are reopening a DB), when this happens we don't want creating the
// directory to cause an error. However, we need to check if creating the
@ -966,12 +963,12 @@ Status DBImpl::Recover(
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence, external_table);
for (size_t i = 0; s.ok() && i < logs.size(); i++) {
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
s = RecoverLogFile(logs[i], &max_sequence, read_only);
}
if (s.ok()) {
@ -986,10 +983,8 @@ Status DBImpl::Recover(
return s;
}
Status DBImpl::RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence,
MemTable* external_table) {
Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
bool read_only) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
@ -1006,6 +1001,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
mutex_.AssertHeld();
VersionEdit edit;
// Open the log file
std::string fname = LogFileName(options_.wal_dir, log_number);
unique_ptr<SequentialFile> file;
@ -1035,11 +1032,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
std::string scratch;
Slice record;
WriteBatch batch;
MemTable* mem = nullptr;
if (external_table) {
mem = external_table;
}
while (reader.ReadRecord(&record, &scratch) && status.ok()) {
bool memtable_empty = true;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < 12) {
reporter.Corruption(
record.size(), Status::Corruption("log record too small"));
@ -1047,14 +1041,11 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
}
WriteBatchInternal::SetContents(&batch, record);
if (mem == nullptr) {
mem = new MemTable(internal_comparator_, options_);
mem->Ref();
}
status = WriteBatchInternal::InsertInto(&batch, mem, &options_);
status = WriteBatchInternal::InsertInto(&batch, mem_, &options_);
memtable_empty = false;
MaybeIgnoreError(&status);
if (!status.ok()) {
break;
return status;
}
const SequenceNumber last_seq =
WriteBatchInternal::Sequence(&batch) +
@ -1063,28 +1054,44 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
*max_sequence = last_seq;
}
if (!external_table &&
mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(mem, edit);
if (!read_only &&
mem_->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(mem_, &edit);
// we still want to clear memtable, even if the recovery failed
delete mem_->Unref();
mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref();
memtable_empty = true;
if (!status.ok()) {
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
break;
return status;
}
delete mem->Unref();
mem = nullptr;
}
}
if (status.ok() && mem != nullptr && !external_table) {
status = WriteLevel0TableForRecovery(mem, edit);
// Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail.
if (!memtable_empty && !read_only) {
status = WriteLevel0TableForRecovery(mem_, &edit);
delete mem_->Unref();
mem_ = new MemTable(internal_comparator_, options_);
mem_->Ref();
if (!status.ok()) {
return status;
}
}
if (mem != nullptr && !external_table) {
delete mem->Unref();
if (edit.NumEntries() > 0) {
// if read_only, NumEntries() will be 0
assert(!read_only);
// writing log number in the manifest means that any log file
// with number strongly less than (log_number + 1) is already
// recovered and should be ignored on next reincarnation.
// Since we already recovered log_number, we want all logs
// with numbers `<= log_number` (includes this one) to be ignored
edit.SetLogNumber(log_number + 1);
status = versions_->LogAndApply(&edit, &mutex_);
}
return status;
}
@ -3939,9 +3946,7 @@ Status DB::OpenWithColumnFamilies(
return s;
}
impl->mutex_.Lock();
VersionEdit edit;
// Handles create_if_missing, error_if_exists
s = impl->Recover(&edit, column_families);
s = impl->Recover(); // Handles create_if_missing, error_if_exists
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
unique_ptr<WritableFile> lfile;
@ -3953,6 +3958,7 @@ Status DB::OpenWithColumnFamilies(
);
if (s.ok()) {
lfile->SetPreallocationBlockSize(1.1 * impl->options_.write_buffer_size);
VersionEdit edit;
edit.SetLogNumber(new_log_number);
impl->logfile_number_ = new_log_number;
impl->log_.reset(new log::Writer(std::move(lfile)));

View File

@ -301,10 +301,8 @@ class DBImpl : public DB {
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit,
const std::vector<ColumnFamilyDescriptor>& column_families,
MemTable* external_table = nullptr,
bool error_if_log_file_exist = false);
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_log_file_exist = false);
void MaybeIgnoreError(Status* s) const;
@ -318,10 +316,8 @@ class DBImpl : public DB {
Status FlushMemTableToOutputFile(bool* madeProgress,
DeletionState& deletion_state);
Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit,
SequenceNumber* max_sequence,
MemTable* external_table);
Status RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
bool read_only);
// The following two methods are used to flush a memtable to
// storage. The first one is used atdatabase RecoveryTime (when the

View File

@ -85,14 +85,12 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DBImplReadOnly* impl = new DBImplReadOnly(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(default_column_family_name, cf_options));
Status s = impl->Recover(&edit, column_families, impl->GetMemTable(),
error_if_log_file_exist);
Status s = impl->Recover(column_families, true /* read only */, error_if_log_file_exist);
impl->mutex_.Unlock();
if (s.ok()) {
*dbptr = impl;

View File

@ -672,6 +672,31 @@ class DBTest {
ASSERT_EQ(IterStatus(iter), expected_key);
delete iter;
}
void CopyFile(const std::string& source, const std::string& destination,
uint64_t size = 0) {
const EnvOptions soptions;
unique_ptr<SequentialFile> srcfile;
ASSERT_OK(env_->NewSequentialFile(source, &srcfile, soptions));
unique_ptr<WritableFile> destfile;
ASSERT_OK(env_->NewWritableFile(destination, &destfile, soptions));
if (size == 0) {
// default argument means copy everything
ASSERT_OK(env_->GetFileSize(source, &size));
}
char buffer[4096];
Slice slice;
while (size > 0) {
uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
ASSERT_OK(srcfile->Read(one, &slice, buffer));
ASSERT_OK(destfile->Append(slice));
size -= slice.size();
}
ASSERT_OK(destfile->Close());
}
};
static std::string Key(int i) {
@ -1474,6 +1499,82 @@ TEST(DBTest, Recover) {
} while (ChangeOptions());
}
TEST(DBTest, IgnoreRecoveredLog) {
std::string backup_logs = dbname_ + "/backup_logs";
// delete old files in backup_logs directory
env_->CreateDirIfMissing(backup_logs);
std::vector<std::string> old_files;
env_->GetChildren(backup_logs, &old_files);
for (auto& file : old_files) {
if (file != "." && file != "..") {
env_->DeleteFile(backup_logs + "/" + file);
}
}
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
options.wal_dir = dbname_ + "/logs";
DestroyAndReopen(&options);
// fill up the DB
std::string one, two;
PutFixed64(&one, 1);
PutFixed64(&two, 2);
ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one)));
ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one)));
// copy the logs to backup
std::vector<std::string> logs;
env_->GetChildren(options.wal_dir, &logs);
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
}
}
// recover the DB
Reopen(&options);
ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar"));
Close();
// copy the logs from backup back to wal dir
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
}
}
// this should ignore the log files, recovery should not happen again
// if the recovery happens, the same merge operator would be called twice,
// leading to incorrect results
Reopen(&options);
ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar"));
Close();
Destroy(&options);
// copy the logs from backup back to wal dir
env_->CreateDirIfMissing(options.wal_dir);
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
// we won't be needing this file no more
env_->DeleteFile(backup_logs + "/" + log);
}
}
// assert that we successfully recovered only from logs, even though we
// destroyed the DB
Reopen(&options);
ASSERT_EQ(two, Get("foo"));
ASSERT_EQ(one, Get("bar"));
Close();
} while (ChangeOptions());
}
TEST(DBTest, RollLog) {
do {
ASSERT_OK(Put("foo", "v1"));
@ -3616,7 +3717,6 @@ TEST(DBTest, BloomFilter) {
TEST(DBTest, SnapshotFiles) {
do {
Options options = CurrentOptions();
const EnvOptions soptions;
options.write_buffer_size = 100000000; // Large write buffer
Reopen(&options);
@ -3672,20 +3772,7 @@ TEST(DBTest, SnapshotFiles) {
}
}
}
unique_ptr<SequentialFile> srcfile;
ASSERT_OK(env_->NewSequentialFile(src, &srcfile, soptions));
unique_ptr<WritableFile> destfile;
ASSERT_OK(env_->NewWritableFile(dest, &destfile, soptions));
char buffer[4096];
Slice slice;
while (size > 0) {
uint64_t one = std::min(uint64_t(sizeof(buffer)), size);
ASSERT_OK(srcfile->Read(one, &slice, buffer));
ASSERT_OK(destfile->Append(slice));
size -= slice.size();
}
ASSERT_OK(destfile->Close());
CopyFile(src, dest, size);
}
// release file snapshot

View File

@ -2073,23 +2073,21 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
// Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level.
// TODO(opt): use concatenating iterator for level-0 if there is no overlap
const int space = (c->level() == 0 ? c->inputs_[0].size() + 1 : 2);
const int space = (c->level() == 0 ? c->inputs(0)->size() + 1 : 2);
Iterator** list = new Iterator*[space];
int num = 0;
for (int which = 0; which < 2; which++) {
if (!c->inputs_[which].empty()) {
if (!c->inputs(which)->empty()) {
if (c->level() + which == 0) {
const std::vector<FileMetaData*>& files = c->inputs_[which];
for (size_t i = 0; i < files.size(); i++) {
for (const auto& file : *c->inputs(which)) {
list[num++] = table_cache_->NewIterator(
options, storage_options_compactions_,
files[i]->number, files[i]->file_size, nullptr,
true /* for compaction */);
options, storage_options_compactions_, file->number,
file->file_size, nullptr, true /* for compaction */);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, &c->inputs_[which]),
new Version::LevelFileNumIterator(icmp_, c->inputs(which)),
&GetFileIterator, table_cache_, options, storage_options_,
true /* for compaction */);
}
@ -2115,7 +2113,7 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
#ifndef NDEBUG
// TODO this only works for default column family now
Version* version = column_family_data_.find(0)->second->current;
if (c->input_version_ != version) {
if (c->input_version() != version) {
Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch");
}

View File

@ -5,6 +5,7 @@
//
#include "util/statistics.h"
#include "rocksdb/statistics.h"
#include <algorithm>
#include <cstdio>
namespace rocksdb {
@ -13,7 +14,11 @@ std::shared_ptr<Statistics> CreateDBStatistics() {
return std::make_shared<StatisticsImpl>();
}
StatisticsImpl::StatisticsImpl() {}
StatisticsImpl::StatisticsImpl() {
// Fill tickers_ with "zero". To ensure plasform indepedent, we used
// uint_fast64_t() instead literal `0` to represent zero.
std::fill(tickers_, tickers_ + TICKER_ENUM_MAX, uint_fast64_t());
}
StatisticsImpl::~StatisticsImpl() {}