diff --git a/HISTORY.md b/HISTORY.md index fee65c8517..3c07550206 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ ### Public API Changes * Change names in CompactionPri and add a new one. * Deprecate options.soft_rate_limit and add options.soft_pending_compaction_bytes_limit. +* If options.max_write_buffer_number > 3, writes will be slowed down when writing to the last write buffer to delay a full stop. ## 4.3.0 (12/8/2015) ### New Features diff --git a/db/column_family.cc b/db/column_family.cc index b2d0a1b0ed..04f5db67d6 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -447,6 +447,16 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "(waiting for flush), max_write_buffer_number is set to %d", name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number); + } else if (mutable_cf_options.max_write_buffer_number > 3 && + imm()->NumNotFlushed() >= + mutable_cf_options.max_write_buffer_number - 1) { + write_controller_token_ = write_controller->GetDelayToken(); + internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); + Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, + "[%s] Stalling writes because we have %d immutable memtables " + "(waiting for flush), max_write_buffer_number is set to %d", + name_.c_str(), imm()->NumNotFlushed(), + mutable_cf_options.max_write_buffer_number); } else if (vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); diff --git a/db/db_test.cc b/db/db_test.cc index 86bec7cf67..b1193f48b4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -9144,6 +9144,42 @@ TEST_F(DBTest, SoftLimit) { sleeping_task_low.WakeUp(); sleeping_task_low.WaitUntilDone(); } + +TEST_F(DBTest, LastWriteBufferDelay) { + Options options; + options.env = env_; + options = CurrentOptions(options); + options.write_buffer_size = 100000; + options.max_write_buffer_number = 4; + options.delayed_write_rate = 20000; + options.compression = kNoCompression; + options.disable_auto_compactions = true; + int kNumKeysPerMemtable = 3; + options.memtable_factory.reset( + new SpecialSkipListFactory(kNumKeysPerMemtable)); + + Reopen(options); + test::SleepingBackgroundTask sleeping_task; + // Block flushes + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + sleeping_task.WaitUntilSleeping(); + + // Create 3 L0 files, making score of L0 to be 3. + for (int i = 0; i < 3; i++) { + // Fill one mem table + for (int j = 0; j < kNumKeysPerMemtable; j++) { + Put(Key(j), ""); + } + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + } + // Inserting a new entry would create a new mem table, triggering slow down. + Put(Key(0), ""); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); +} #endif // ROCKSDB_LITE TEST_F(DBTest, FailWhenCompressionNotSupportedTest) { diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 417c88ce34..e74f2c9945 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -700,7 +700,7 @@ void InternalStats::DumpCFStats(std::string* value) { cf_stats_count_[LEVEL0_NUM_FILES_TOTAL] + cf_stats_count_[SOFT_PENDING_COMPACTION_BYTES_LIMIT] + cf_stats_count_[HARD_PENDING_COMPACTION_BYTES_LIMIT] + - cf_stats_count_[MEMTABLE_COMPACTION]; + cf_stats_count_[MEMTABLE_COMPACTION] + cf_stats_count_[MEMTABLE_SLOWDOWN]; // Stats summary across levels PrintLevelStats(buf, sizeof(buf), "Sum", total_files, total_files_being_compacted, total_file_size, 0, w_amp, @@ -734,6 +734,8 @@ void InternalStats::DumpCFStats(std::string* value) { " slowdown for pending_compaction_bytes, " "%" PRIu64 " memtable_compaction, " + "%" PRIu64 + " memtable_slowdown, " "interval %" PRIu64 " total count\n", cf_stats_count_[LEVEL0_SLOWDOWN_TOTAL], cf_stats_count_[LEVEL0_SLOWDOWN_WITH_COMPACTION], @@ -742,6 +744,7 @@ void InternalStats::DumpCFStats(std::string* value) { cf_stats_count_[HARD_PENDING_COMPACTION_BYTES_LIMIT], cf_stats_count_[SOFT_PENDING_COMPACTION_BYTES_LIMIT], cf_stats_count_[MEMTABLE_COMPACTION], + cf_stats_count_[MEMTABLE_SLOWDOWN], total_stall_count - cf_stats_snapshot_.stall_count); value->append(buf); diff --git a/db/internal_stats.h b/db/internal_stats.h index b22c79951d..16aee45a8b 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -86,6 +86,7 @@ class InternalStats { LEVEL0_SLOWDOWN_TOTAL, LEVEL0_SLOWDOWN_WITH_COMPACTION, MEMTABLE_COMPACTION, + MEMTABLE_SLOWDOWN, LEVEL0_NUM_FILES_TOTAL, LEVEL0_NUM_FILES_WITH_COMPACTION, SOFT_PENDING_COMPACTION_BYTES_LIMIT, @@ -343,6 +344,7 @@ class InternalStats { LEVEL0_SLOWDOWN_TOTAL, LEVEL0_SLOWDOWN_WITH_COMPACTION, MEMTABLE_COMPACTION, + MEMTABLE_SLOWDOWN, LEVEL0_NUM_FILES_TOTAL, LEVEL0_NUM_FILES_WITH_COMPACTION, SOFT_PENDING_COMPACTION_BYTES_LIMIT, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index ad33bd86a8..a94571f95d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -256,6 +256,9 @@ struct ColumnFamilyOptions { // The default and the minimum number is 2, so that when 1 write buffer // is being flushed to storage, new writes can continue to the other // write buffer. + // If max_write_buffer_number > 3, writing will be slowed down to + // options.delayed_write_rate if we are writing to the last write buffer + // allowed. // // Default: 2 //