Enable background flush thread by default and fix issues related to it

Summary:
Enable background flush thread in this patch and fix unit tests with:
(1) After background flush, schedule a background compaction if condition satisfied;
(2) Fix a bug that if universal compaction is enabled and number of levels are set to be 0, compaction will not be automatically triggered
(3) Fix unit tests to wait for compaction to finish instead of flush, before checking the compaction results.

Test Plan: pass all unit tests

Reviewers: haobo, xjin, dhruba

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13461
This commit is contained in:
Siying Dong 2013-10-16 13:32:53 -07:00
parent cb5b2baf18
commit 073cbfc8f0
6 changed files with 28 additions and 7 deletions

View File

@ -324,6 +324,7 @@ TEST(CorruptionTest, CompactionInputErrorParanoid) {
Build(10);
dbi->TEST_FlushMemTable();
dbi->TEST_WaitForCompact();
ASSERT_EQ(1, Property("rocksdb.num-files-at-level0"));
Corrupt(kTableFile, 100, 1);

View File

@ -1436,24 +1436,25 @@ void DBImpl::BGWorkCompaction(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
}
Status DBImpl::BackgroundFlush() {
Status DBImpl::BackgroundFlush(bool* madeProgress) {
Status stat;
while (stat.ok() &&
imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) {
Log(options_.info_log,
"BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_);
stat = FlushMemTableToOutputFile();
stat = FlushMemTableToOutputFile(madeProgress);
}
return stat;
}
void DBImpl::BackgroundCallFlush() {
bool madeProgress = false;
assert(bg_flush_scheduled_);
MutexLock l(&mutex_);
if (!shutting_down_.Acquire_Load()) {
Status s = BackgroundFlush();
Status s = BackgroundFlush(&madeProgress);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
@ -1469,7 +1470,9 @@ void DBImpl::BackgroundCallFlush() {
}
bg_flush_scheduled_--;
if (madeProgress) {
MaybeScheduleFlushOrCompaction();
}
bg_cv_.SignalAll();
}

View File

@ -186,7 +186,7 @@ class DBImpl : public DB {
void BackgroundCallCompaction();
void BackgroundCallFlush();
Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state);
Status BackgroundFlush();
Status BackgroundFlush(bool* madeProgress);
void CleanupCompaction(CompactionState* compact);
Status DoCompactionWork(CompactionState* compact);

View File

@ -1850,6 +1850,8 @@ TEST(DBTest, UniversalCompactionSizeAmplification) {
// but will instead trigger size amplification.
dbfull()->Flush(FlushOptions());
dbfull()->TEST_WaitForCompact();
// Verify that size amplification did occur
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
}

View File

@ -1645,7 +1645,12 @@ void VersionSet::Finalize(Version* v,
double max_score = 0;
int max_score_level = 0;
for (int level = 0; level < NumberLevels()-1; level++) {
int num_levels_to_check =
(options_->compaction_style != kCompactionStyleUniversal) ?
NumberLevels() - 1 : 1;
for (int level = 0; level < num_levels_to_check; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files

View File

@ -312,7 +312,17 @@ class VersionSet {
// Returns true iff some level needs a compaction because it has
// exceeded its target size.
bool NeedsSizeCompaction() const {
for (int i = 0; i < NumberLevels()-1; i++) {
// In universal compaction case, this check doesn't really
// check the compaction condition, but checks num of files threshold
// only. We are not going to miss any compaction opportunity
// but it's likely that more compactions are scheduled but
// ending up with nothing to do. We can improve it later.
// TODO: improve this function to be accurate for universal
// compactions.
int num_levels_to_check =
(options_->compaction_style != kCompactionStyleUniversal) ?
NumberLevels() - 1 : 1;
for (int i = 0; i < num_levels_to_check; i++) {
if (current_->compaction_score_[i] >= 1) {
return true;
}