diff --git a/db/builder.cc b/db/builder.cc index 2b7c592832..db09c0d8c9 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, const SequenceNumber earliest_seqno_in_memtable) { Status s; meta->file_size = 0; + meta->smallest_seqno = meta->largest_seqno = 0; iter->SeekToFirst(); // If the sequence number of the smallest entry in the memtable is @@ -50,6 +51,8 @@ Status BuildTable(const std::string& dbname, // the first key is the smallest key Slice key = iter->key(); meta->smallest.DecodeFrom(key); + meta->smallest_seqno = GetInternalKeySeqno(key); + meta->largest_seqno = meta->smallest_seqno; MergeHelper merge(user_comparator, options.merge_operator, options.info_log.get(), @@ -124,12 +127,18 @@ Status BuildTable(const std::string& dbname, // output last key builder->Add(Slice(prev_key), Slice(prev_value)); meta->largest.DecodeFrom(Slice(prev_key)); + SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key)); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } else { for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); meta->largest.DecodeFrom(key); builder->Add(key, iter->value()); + SequenceNumber seqno = GetInternalKeySeqno(key); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } } diff --git a/db/db_bench.cc b/db/db_bench.cc index 179896a77f..ad0765a316 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -137,6 +137,9 @@ static int FLAGS_min_write_buffer_number_to_merge = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// Run database in hybrid mode where all data resides in L0. +static bool FLAGS_hybrid_mode = false; + // Number of bytes to use as a cache of uncompressed data. // Negative means use default settings. static long FLAGS_cache_size = -1; @@ -1104,6 +1107,7 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") options.min_write_buffer_number_to_merge = FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; + options.hybrid_mode = FLAGS_hybrid_mode; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1986,6 +1990,8 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_hybrid_mode = + leveldb::Options().hybrid_mode; // Compression test code above refers to FLAGS_block_size FLAGS_block_size = leveldb::Options().block_size; FLAGS_use_os_buffer = leveldb::EnvOptions().use_os_buffer; @@ -2044,6 +2050,8 @@ int main(int argc, char** argv) { FLAGS_min_write_buffer_number_to_merge = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { + FLAGS_hybrid_mode = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index c5c156feb4..d5bd3cbd9e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -75,6 +75,7 @@ struct DBImpl::CompactionState { uint64_t number; uint64_t file_size; InternalKey smallest, largest; + SequenceNumber smallest_seqno, largest_seqno; }; std::vector outputs; std::list allocated_file_numbers; @@ -759,7 +760,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { int level = 0; if (s.ok() && meta.file_size > 0) { edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -833,11 +835,13 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for // that key range. - if (base != nullptr && options_.max_background_compactions <= 1) { + if (base != nullptr && options_.max_background_compactions <= 1 && + !options_.hybrid_mode) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -1356,7 +1360,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, - f->smallest, f->largest); + f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_); VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", @@ -1468,6 +1473,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); + out.smallest_seqno = out.largest_seqno = 0; compact->outputs.push_back(out); // Make the output file @@ -1478,10 +1484,10 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { // Over-estimate slightly so we don't end up just barely crossing // the threshold. compact->outfile->SetPreallocationBlockSize( - 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->level() + 1)); + 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); compact->builder.reset(new TableBuilder(options_, compact->outfile.get(), - compact->compaction->level() + 1)); + compact->compaction->output_level())); } return s; } @@ -1572,8 +1578,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile( - level + 1, - out.number, out.file_size, out.smallest, out.largest); + options_.hybrid_mode? level : level + 1, + out.number, out.file_size, out.smallest, out.largest, + out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -1821,7 +1828,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level && ikey.sequence < earliest_snapshot && + // Hybrid mode depends on the sequence number to determine + // time-order of files that is needed for compactions. + if (!options_.hybrid_mode && + bottommost_level && ikey.sequence < earliest_snapshot && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems @@ -1841,11 +1851,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } + SequenceNumber seqno = GetInternalKeySeqno(newkey); if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(newkey); + compact->current_output()->smallest_seqno = seqno; + } else { + compact->current_output()->smallest_seqno = + std::min(compact->current_output()->smallest_seqno, seqno); } compact->current_output()->largest.DecodeFrom(newkey); compact->builder->Add(newkey, value); + compact->current_output()->largest_seqno = + std::max(compact->current_output()->largest_seqno, seqno); // Close output file if it is big enough if (compact->builder->FileSize() >= diff --git a/db/db_test.cc b/db/db_test.cc index 7ce4ac4198..8275029b48 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3306,7 +3306,7 @@ void BM_LogAndApply(int iters, int num_base_files) { for (int i = 0; i < num_base_files; i++) { InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vbase.AddFile(2, fnum++, 1 /* file size */, start, limit); + vbase.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); } ASSERT_OK(vset.LogAndApply(&vbase, &mu)); @@ -3317,7 +3317,7 @@ void BM_LogAndApply(int iters, int num_base_files) { vedit.DeleteFile(2, fnum); InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vedit.AddFile(2, fnum++, 1 /* file size */, start, limit); + vedit.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); vset.LogAndApply(&vedit, &mu); } uint64_t stop_micros = env->NowMicros(); diff --git a/db/dbformat.h b/db/dbformat.h index 1a05166704..5d596ad1ae 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -173,6 +173,15 @@ inline void UpdateInternalKey(char* internal_key, EncodeFixed64(seqtype, newval); } +// Get the sequence number from the internal key +inline uint64_t GetInternalKeySeqno(const Slice& internal_key) { + const size_t n = internal_key.size(); + assert(n >= 8); + uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + return num >> 8; +} + + // A helper class useful for DBImpl::Get() class LookupKey { public: diff --git a/db/repair.cc b/db/repair.cc index d1c0c45255..09406781a4 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -88,6 +88,7 @@ class Repairer { private: struct TableInfo { FileMetaData meta; + SequenceNumber min_sequence; SequenceNumber max_sequence; }; @@ -263,6 +264,7 @@ class Repairer { ReadOptions(), storage_options_, t->meta.number, t->meta.file_size); bool empty = true; ParsedInternalKey parsed; + t->min_sequence = 0; t->max_sequence = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); @@ -279,6 +281,9 @@ class Repairer { t->meta.smallest.DecodeFrom(key); } t->meta.largest.DecodeFrom(key); + if (parsed.sequence < t->min_sequence) { + t->min_sequence = parsed.sequence; + } if (parsed.sequence > t->max_sequence) { t->max_sequence = parsed.sequence; } @@ -319,7 +324,8 @@ class Repairer { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; edit_->AddFile(0, t.meta.number, t.meta.file_size, - t.meta.smallest, t.meta.largest); + t.meta.smallest, t.meta.largest, + t.min_sequence, t.max_sequence); } //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); diff --git a/db/version_edit.cc b/db/version_edit.cc index ed63c10139..65a63947ac 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -20,7 +20,10 @@ enum Tag { kDeletedFile = 6, kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9 + kPrevLogNumber = 9, + + // these are new formats divergent from open source leveldb + kNewFile2 = 100 // store smallest & largest seqno }; void VersionEdit::Clear() { @@ -76,12 +79,14 @@ void VersionEdit::EncodeTo(std::string* dst) const { for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; - PutVarint32(dst, kNewFile); + PutVarint32(dst, kNewFile2); PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.number); PutVarint64(dst, f.file_size); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); + PutVarint64(dst, f.smallest_seqno); + PutVarint64(dst, f.largest_seqno); } } @@ -201,6 +206,22 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kNewFile2: + if (GetLevel(&input, &level, &msg) && + GetVarint64(&input, &f.number) && + GetVarint64(&input, &f.file_size) && + GetInternalKey(&input, &f.smallest) && + GetInternalKey(&input, &f.largest) && + GetVarint64(&input, &f.smallest_seqno) && + GetVarint64(&input, &f.largest_seqno) ) { + new_files_.push_back(std::make_pair(level, f)); + } else { + if (!msg) { + msg = "new-file2 entry"; + } + } + break; + default: msg = "unknown tag"; break; diff --git a/db/version_edit.h b/db/version_edit.h index 2743e9e0d7..7037763b8c 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -22,6 +22,8 @@ struct FileMetaData { InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table bool being_compacted; // Is this file undergoing compaction? + SequenceNumber smallest_seqno;// The smallest seqno in this file + SequenceNumber largest_seqno; // The largest seqno in this file FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0), being_compacted(false) { } @@ -67,12 +69,17 @@ class VersionEdit { void AddFile(int level, uint64_t file, uint64_t file_size, const InternalKey& smallest, - const InternalKey& largest) { + const InternalKey& largest, + const SequenceNumber& smallest_seqno, + const SequenceNumber& largest_seqno) { FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; + f.smallest_seqno = smallest_seqno; + f.largest_seqno = largest_seqno; + assert(smallest_seqno <= largest_seqno); new_files_.push_back(std::make_pair(level, f)); } diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index b211eb1a99..26b69199e7 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -27,7 +27,9 @@ TEST(VersionEditTest, EncodeDecode) { TestEncodeDecode(edit); edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, InternalKey("foo", kBig + 500 + i, kTypeValue), - InternalKey("zoo", kBig + 600 + i, kTypeDeletion)); + InternalKey("zoo", kBig + 600 + i, kTypeDeletion), + kBig + 500 + i, + kBig + 600 + i); edit.DeleteFile(4, kBig + 700 + i); edit.SetCompactPointer(i, InternalKey("x", kBig + 900 + i, kTypeValue)); } diff --git a/db/version_set.cc b/db/version_set.cc index 9b01d935e0..a31cbe01a4 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5,6 +5,7 @@ #include "db/version_set.h" #include +#include #include #include "db/filename.h" #include "db/log_reader.h" @@ -309,6 +310,14 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } +static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { + if (a->smallest_seqno > b->smallest_seqno) { + assert(a->largest_seqno > b->largest_seqno); + return true; + } + assert(a->largest_seqno <= b->largest_seqno); + return false; +} Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), next_(this), prev_(this), refs_(0), @@ -375,7 +384,11 @@ void Version::Get(const ReadOptions& options, } if (tmp.empty()) continue; - std::sort(tmp.begin(), tmp.end(), NewestFirst); + if (vset_->options_->hybrid_mode) { + std::sort(tmp.begin(), tmp.end(), NewestFirstBySeqNo); + } else { + std::sort(tmp.begin(), tmp.end(), NewestFirst); + } files = &tmp[0]; num_files = tmp.size(); } else { @@ -1011,7 +1024,10 @@ void VersionSet::Init(int num_levels) { int target_file_size_multiplier = options_->target_file_size_multiplier; int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; for (int i = 0; i < num_levels; i++) { - if (i > 1) { + if (i == 0) { + max_file_size_[i] = LLONG_MAX; + level_max_bytes_[i] = options_->max_bytes_for_level_base; + } else if (i > 1) { max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier * options_->max_bytes_for_level_multiplier_additional[i-1]; @@ -1558,17 +1574,32 @@ void VersionSet::Finalize(Version* v, } } -// a static compator used to sort files based on their size -static bool compareSize(const VersionSet::Fsize& first, +// A static compator used to sort files based on their size +// In normal mode: descending size +static bool compareSizeDescending(const VersionSet::Fsize& first, const VersionSet::Fsize& second) { return (first.file->file_size > second.file->file_size); } +// A static compator used to sort files based on their seqno +// In hybrid mode: descending seqno +static bool compareSeqnoDescending(const VersionSet::Fsize& first, + const VersionSet::Fsize& second) { + if (first.file->smallest_seqno > second.file->smallest_seqno) { + assert(first.file->largest_seqno > second.file->largest_seqno); + return true; + } + assert(first.file->largest_seqno <= second.file->largest_seqno); + return false; +} // sort all files in level1 to level(n-1) based on file size void VersionSet::UpdateFilesBySize(Version* v) { // No need to sort the highest level because it is never compacted. - for (int level = 0; level < NumberLevels()-1; level++) { + int max_level = options_->hybrid_mode? NumberLevels() : + NumberLevels() - 1; + + for (int level = 0; level < max_level; level++) { const std::vector& files = v->files_[level]; std::vector& files_by_size = v->files_by_size_[level]; @@ -1582,12 +1613,18 @@ void VersionSet::UpdateFilesBySize(Version* v) { } // sort the top number_of_files_to_sort_ based on file size - int num = Version::number_of_files_to_sort_; - if (num > (int)temp.size()) { - num = temp.size(); + if (options_->hybrid_mode) { + int num = temp.size(); + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSeqnoDescending); + } else { + int num = Version::number_of_files_to_sort_; + if (num > (int)temp.size()) { + num = temp.size(); + } + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSizeDescending); } - std::partial_sort(temp.begin(), temp.begin() + num, - temp.end(), compareSize); assert(temp.size() == files.size()); // initialize files_by_size_ @@ -1620,7 +1657,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest); + edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } } @@ -1664,6 +1702,23 @@ const char* VersionSet::LevelDataSizeSummary( return scratch->buffer; } +const char* VersionSet::LevelFileSummary( + FileSummaryStorage* scratch, int level) const { + int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + int sz = sizeof(scratch->buffer) - len; + int ret = snprintf(scratch->buffer + len, sz, "#%ld(seq=%ld,sz=%ld,%d) ", + f->number, f->smallest_seqno, + f->file_size, f->being_compacted); + if (ret < 0 || ret >= sz) + break; + len += ret; + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); + return scratch->buffer; +} + // Opens the mainfest file and reads all records // till it finds the record we are looking for. bool VersionSet::ManifestContains(const std::string& record) const { @@ -1961,6 +2016,166 @@ void VersionSet::SizeBeingCompacted(std::vector& sizes) { } } +Compaction* VersionSet::PickCompactionHybrid(int level, double score) { + assert (level == 0); + + // percentage flexibilty while comparing file sizes + uint64_t ratio = 1; + + if ((current_->files_[level].size() <= + (unsigned int)options_->level0_file_num_compaction_trigger)) { + Log(options_->info_log, "XXX Hybrid: nothing to do\n"); + return nullptr; + } + VersionSet::FileSummaryStorage tmp; + Log(options_->info_log, "Hybrid: candidate files(%lu): %s\n", + current_->files_[level].size(), + LevelFileSummary(&tmp, 0)); + + Compaction* c = nullptr; + c = new Compaction(level, level, MaxFileSizeForLevel(level), + LLONG_MAX, NumberLevels()); + c->score_ = score; + + // The files are sorted from newest first to oldest last. + std::vector& file_by_time = current_->files_by_size_[level]; + FileMetaData* f = nullptr; + bool done = false; + assert(file_by_time.size() == current_->files_[level].size()); + + unsigned int max_files_to_compact = UINT_MAX; + + // Make two pass. The first pass considers a candidate file + // only if it is smaller than the total size accumulated so far. + // The second pass does not look at the slope of the + // file-size curve to decide what to pick for compaction. + for (int iter = 0; !done && iter < 2; iter++) { + + for (unsigned int loop = 0; loop < file_by_time.size(); ) { + + // Skip files that are already being compacted + for (f = nullptr; loop < file_by_time.size(); loop++) { + int index = file_by_time[loop]; + f = current_->files_[level][index]; + + if (!f->being_compacted) { + break; + } + Log(options_->info_log, "Hybrid: file %ld[%d] being compacted, skipping", + f->number, loop); + f = nullptr; + } + + // This file is not being compacted. Consider it as the + // first candidate to be compacted. + unsigned int candidate_count = 1; + uint64_t candidate_size = f != nullptr? f->file_size : 0; + if (f != nullptr) { + Log(options_->info_log, "Hybrid: Possible candidate file %ld[%d] %s.", + f->number, loop, iter == 0? "" : "forced "); + } + + // Check if the suceeding files need compaction. + for (unsigned int i = loop+1; + candidate_count < max_files_to_compact && i < file_by_time.size(); + i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + if (f->being_compacted) { + break; + } + // If this is the first iteration, then we pick files if the + // total candidate file size (increased by the specified ratio) + // is still larger than the next candidate file. + if (iter == 0) { + uint64_t sz = (candidate_size * (100 + ratio)) /100; + if (sz < f->file_size) { + break; + } + } + candidate_count++; + candidate_size += f->file_size; + } + + // Found a series of consecutive files that need compaction. + if (candidate_count > 1) { + for (unsigned int i = loop; i < loop + candidate_count; i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + c->inputs_[0].push_back(f); + Log(options_->info_log, "Hybrid: Picking file %ld[%d] with size %ld %s", + f->number, i, f->file_size, + (iter == 0 ? "" : "forced")); + } + done = true; + break; + } else { + for (unsigned int i = loop; + i < loop + candidate_count && i < file_by_time.size(); i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + Log(options_->info_log, "Hybrid: Skipping file %ld[%d] with size %ld %d %s", + f->number, i, f->file_size, f->being_compacted, + (iter == 0 ? "" : "forced")); + } + } + loop += candidate_count; + } + assert(done || c->inputs_[0].size() == 0); + + // If we are unable to find a normal compaction run and we are still + // above the compaction threshold, iterate again to pick compaction + // candidates, this time without considering their size differences. + if (!done) { + int files_not_in_compaction = 0; + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + f = current_->files_[level][i]; + if (!f->being_compacted) { + files_not_in_compaction++; + } + } + int expected_num_files = files_not_in_compaction + + compactions_in_progress_[level].size(); + if (expected_num_files <= + options_->level0_file_num_compaction_trigger + 1) { + done = true; // nothing more to do + } else { + max_files_to_compact = expected_num_files - + options_->level0_file_num_compaction_trigger; + Log(options_->info_log, "Hybrid: second loop with maxfiles %d", + max_files_to_compact); + } + } + } + if (c->inputs_[0].size() <= 1) { + Log(options_->info_log, "XXX Hybrid: only %ld files, nothing to do.\n", + c->inputs_[0].size()); + delete c; + return nullptr; + } + + // validate that all the chosen files are non overlapping in time + FileMetaData* newerfile __attribute__((unused)) = nullptr; + for (unsigned int i = 0; i < c->inputs_[0].size(); i++) { + FileMetaData* f = c->inputs_[0][i]; + assert (f->smallest_seqno <= f->largest_seqno); + assert(newerfile == nullptr || + newerfile->smallest_seqno > f->largest_seqno); + newerfile = f; + } + + c->input_version_ = current_; + c->input_version_->Ref(); + + // mark all the files that are being compacted + c->MarkFilesBeingCompacted(true); + + // remember this currently undergoing compaction + compactions_in_progress_[level].insert(c); + + return c; +} + Compaction* VersionSet::PickCompactionBySize(int level, double score) { Compaction* c = nullptr; @@ -1974,7 +2189,7 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) { assert(level >= 0); assert(level+1 < NumberLevels()); - c = new Compaction(level, MaxFileSizeForLevel(level), + c = new Compaction(level, level+1, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->score_ = score; @@ -2044,6 +2259,13 @@ Compaction* VersionSet::PickCompaction() { current_->vset_->SizeBeingCompacted(size_being_compacted); Finalize(current_, size_being_compacted); + // In hybrid mode compact L0 files back into L0. + if (options_->hybrid_mode) { + int level = 0; + c = PickCompactionHybrid(level, current_->compaction_score_[level]); + return c; + } + // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. // @@ -2072,7 +2294,7 @@ Compaction* VersionSet::PickCompaction() { if (level != 0 || compactions_in_progress_[0].empty()) { if(!ParentRangeInCompaction(&f->smallest, &f->largest, level, &parent_index)) { - c = new Compaction(level, MaxFileSizeForLevel(level), + c = new Compaction(level, level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels(), true); c->inputs_[0].push_back(f); c->parent_index_ = parent_index; @@ -2246,8 +2468,9 @@ Compaction* VersionSet::CompactRange( } } } + int out_level = options_->hybrid_mode ? level : level+1; - Compaction* c = new Compaction(level, MaxFileSizeForLevel(level), + Compaction* c = new Compaction(level, out_level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->input_version_ = current_; c->input_version_->Ref(); @@ -2261,10 +2484,11 @@ Compaction* VersionSet::CompactRange( return c; } -Compaction::Compaction(int level, uint64_t target_file_size, +Compaction::Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction) : level_(level), + out_level_(out_level), max_output_file_size_(target_file_size), maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes), input_version_(nullptr), diff --git a/db/version_set.h b/db/version_set.h index ba924126f4..85c02b9738 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -340,6 +340,9 @@ class VersionSet { struct LevelSummaryStorage { char buffer[100]; }; + struct FileSummaryStorage { + char buffer[1000]; + }; const char* LevelSummary(LevelSummaryStorage* scratch) const; // printf contents (for debugging) @@ -350,6 +353,10 @@ class VersionSet { // of files per level. Uses *scratch as backing store. const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const; + // Return a human-readable short (single-line) summary of files + // in a specified level. Uses *scratch as backing store. + const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; + // Return the size of the current manifest file const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } @@ -359,6 +366,9 @@ class VersionSet { // function will return nullptr. Compaction* PickCompactionBySize(int level, double score); + // Pick files to compact in hybrid mode + Compaction* PickCompactionHybrid(int level, double score); + // Free up the files that were participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); @@ -489,9 +499,12 @@ class Compaction { ~Compaction(); // Return the level that is being compacted. Inputs from "level" - // and "level+1" will be merged to produce a set of "level+1" files. + // will be merged. int level() const { return level_; } + // Outputs will go to this level + int output_level() const { return out_level_; } + // Return the object that holds the edits to the descriptor done // by this compaction. VersionEdit* edit() { return edit_; } @@ -534,11 +547,12 @@ class Compaction { friend class Version; friend class VersionSet; - explicit Compaction(int level, uint64_t target_file_size, + explicit Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction = false); int level_; + int out_level_; // levels to which output files are stored uint64_t max_output_file_size_; int64_t maxGrandParentOverlapBytes_; Version* input_version_; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index fa69a7eff8..465e83a10f 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -476,6 +476,9 @@ struct Options { // Default: 0 uint64_t bytes_per_sync; + // Hybrid Mode. There is only a single level and files in L0 are + // compacted back into L0. Default: false + bool hybrid_mode; }; // Options that control read operations diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 02afe306cf..4c671d84f7 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -88,6 +88,9 @@ static int FLAGS_max_write_buffer_number = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// This is initialized to default value of false +static bool FLAGS_hybrid_mode = false; + // Number of bytes to use as a cache of uncompressed data. static long FLAGS_cache_size = 2 * KB * KB * KB; @@ -930,6 +933,7 @@ class StressTest { options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.max_background_compactions = FLAGS_max_background_compactions; + options.hybrid_mode = FLAGS_hybrid_mode; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1016,6 +1020,8 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_hybrid_mode = + leveldb::Options().hybrid_mode; FLAGS_level0_file_num_compaction_trigger = leveldb::Options().level0_file_num_compaction_trigger; FLAGS_level0_slowdown_writes_trigger = @@ -1068,6 +1074,8 @@ int main(int argc, char** argv) { FLAGS_max_write_buffer_number = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { + FLAGS_hybrid_mode = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/util/options.cc b/util/options.cc index a8222ad5c1..47dae83981 100644 --- a/util/options.cc +++ b/util/options.cc @@ -76,7 +76,8 @@ Options::Options() advise_random_on_open(true), access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), - bytes_per_sync(0) { + bytes_per_sync(0), + hybrid_mode(false) { } static const char* const access_hints[] = { @@ -217,6 +218,8 @@ Options::Dump(Logger* log) const use_adaptive_mutex); Log(log," Options.bytes_per_sync: %ld", bytes_per_sync); + Log(log," Options.hybrid_mode: %d", + hybrid_mode); } // Options::Dump //