From a8b9891f95235d14f9119edbe8042cad708b22bc Mon Sep 17 00:00:00 2001 From: Burton Li Date: Thu, 13 Dec 2018 13:16:04 -0800 Subject: [PATCH] Concurrent task limiter for compaction thread control (#4332) Summary: The PR is targeting to resolve the issue of: https://github.com/facebook/rocksdb/issues/3972#issue-330771918 We have a rocksdb created with leveled-compaction with multiple column families (CFs), some of CFs are using HDD to store big and less frequently accessed data and others are using SSD. When there are continuously write traffics going on to all CFs, the compaction thread pool is mostly occupied by those slow HDD compactions, which blocks fully utilize SSD bandwidth. Since atomic write and transaction is needed across CFs, so splitting it to multiple rocksdb instance is not an option for us. With the compaction thread control, we got 30%+ HDD write throughput gain, and also a lot smooth SSD write since less write stall happening. ConcurrentTaskLimiter can be shared with multi-CFs across rocksdb instances, so the feature does not only work for multi-CFs scenarios, but also for multi-rocksdbs scenarios, who need disk IO resource control per tenant. The usage is straight forward: e.g.: // // Enable compaction thread limiter thru ColumnFamilyOptions // std::shared_ptr ctl(NewConcurrentTaskLimiter("foo_limiter", 4)); Options options; ColumnFamilyOptions cf_opt(options); cf_opt.compaction_thread_limiter = ctl; ... // // Compaction thread limiter can be tuned or disabled on-the-fly // ctl->SetMaxOutstandingTask(12); // enlarge to 12 tasks ... ctl->ResetMaxOutstandingTask(); // disable (bypass) thread limiter ctl->SetMaxOutstandingTask(-1); // Same as above ... ctl->SetMaxOutstandingTask(0); // full throttle (0 task) // // Sharing compaction thread limiter among CFs (to resolve multiple storage perf issue) // std::shared_ptr ctl_ssd(NewConcurrentTaskLimiter("ssd_limiter", 8)); std::shared_ptr ctl_hdd(NewConcurrentTaskLimiter("hdd_limiter", 4)); Options options; ColumnFamilyOptions cf_opt_ssd1(options); ColumnFamilyOptions cf_opt_ssd2(options); ColumnFamilyOptions cf_opt_hdd1(options); ColumnFamilyOptions cf_opt_hdd2(options); ColumnFamilyOptions cf_opt_hdd3(options); // SSD CFs cf_opt_ssd1.compaction_thread_limiter = ctl_ssd; cf_opt_ssd2.compaction_thread_limiter = ctl_ssd; // HDD CFs cf_opt_hdd1.compaction_thread_limiter = ctl_hdd; cf_opt_hdd2.compaction_thread_limiter = ctl_hdd; cf_opt_hdd3.compaction_thread_limiter = ctl_hdd; ... // // The limiter is disabled by default (or set to nullptr explicitly) // Options options; ColumnFamilyOptions cf_opt(options); cf_opt.compaction_thread_limiter = nullptr; Pull Request resolved: https://github.com/facebook/rocksdb/pull/4332 Differential Revision: D13226590 Pulled By: siying fbshipit-source-id: 14307aec55b8bd59c8223d04aa6db3c03d1b0c1d --- CMakeLists.txt | 1 + TARGETS | 1 + db/db_compaction_test.cc | 189 ++++++++++++++++++++++ db/db_impl.h | 16 ++ db/db_impl_compaction_flush.cc | 92 ++++++++++- include/rocksdb/concurrent_task_limiter.h | 47 ++++++ include/rocksdb/options.h | 9 ++ options/cf_options.cc | 4 +- options/cf_options.h | 2 + options/options_settable_test.cc | 3 + src.mk | 1 + util/concurrent_task_limiter_impl.cc | 66 ++++++++ util/concurrent_task_limiter_impl.h | 68 ++++++++ 13 files changed, 494 insertions(+), 5 deletions(-) create mode 100644 include/rocksdb/concurrent_task_limiter.h create mode 100644 util/concurrent_task_limiter_impl.cc create mode 100644 util/concurrent_task_limiter_impl.h diff --git a/CMakeLists.txt b/CMakeLists.txt index a8eb397833..49683925ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -597,6 +597,7 @@ set(SOURCES util/comparator.cc util/compression_context_cache.cc util/concurrent_arena.cc + util/concurrent_task_limiter_impl.cc util/crc32c.cc util/delete_scheduler.cc util/dynamic_bloom.cc diff --git a/TARGETS b/TARGETS index 246c2efee2..cc84ef1bb2 100644 --- a/TARGETS +++ b/TARGETS @@ -220,6 +220,7 @@ cpp_library( "util/comparator.cc", "util/compression_context_cache.cc", "util/concurrent_arena.cc", + "util/concurrent_task_limiter_impl.cc", "util/crc32c.cc", "util/delete_scheduler.cc", "util/dynamic_bloom.cc", diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 743ec33baa..b27e46cf5a 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -10,8 +10,10 @@ #include "db/db_test_util.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/concurrent_task_limiter.h" #include "rocksdb/experimental.h" #include "rocksdb/utilities/convenience.h" +#include "util/concurrent_task_limiter_impl.h" #include "util/fault_injection_test_env.h" #include "util/sync_point.h" @@ -3890,6 +3892,193 @@ TEST_F(DBCompactionTest, CompactionHasEmptyOutput) { ASSERT_EQ(2, collector->num_ssts_creation_started()); } +TEST_F(DBCompactionTest, CompactionLimiter) { + const int kNumKeysPerFile = 10; + const int kMaxBackgroundThreads = 64; + + struct CompactionLimiter { + std::string name; + int limit_tasks; + int max_tasks; + int tasks; + std::shared_ptr limiter; + }; + + std::vector limiter_settings; + limiter_settings.push_back({"limiter_1", 1, 0, 0, nullptr}); + limiter_settings.push_back({"limiter_2", 2, 0, 0, nullptr}); + limiter_settings.push_back({"limiter_3", 3, 0, 0, nullptr}); + + for (auto& ls : limiter_settings) { + ls.limiter.reset(NewConcurrentTaskLimiter(ls.name, ls.limit_tasks)); + } + + std::shared_ptr unique_limiter( + NewConcurrentTaskLimiter("unique_limiter", -1)); + + const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5", + "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" }; + const int cf_count = sizeof cf_names / sizeof cf_names[0]; + + std::unordered_map cf_to_limiter; + + Options options = CurrentOptions(); + options.write_buffer_size = 110 * 1024; // 110KB + options.arena_block_size = 4096; + options.num_levels = 3; + options.level0_file_num_compaction_trigger = 4; + options.level0_slowdown_writes_trigger = 64; + options.level0_stop_writes_trigger = 64; + options.max_background_jobs = kMaxBackgroundThreads; // Enough threads + options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); + options.max_write_buffer_number = 10; // Enough memtables + DestroyAndReopen(options); + + std::vector option_vector; + option_vector.reserve(cf_count); + + for (int cf = 0; cf < cf_count; cf++) { + ColumnFamilyOptions cf_opt(options); + if (cf == 0) { + // "Default" CF does't use compaction limiter + cf_opt.compaction_thread_limiter = nullptr; + } else if (cf == 1) { + // "1" CF uses bypass compaction limiter + unique_limiter->SetMaxOutstandingTask(-1); + cf_opt.compaction_thread_limiter = unique_limiter; + } else { + // Assign limiter by mod + auto& ls = limiter_settings[cf % 3]; + cf_opt.compaction_thread_limiter = ls.limiter; + cf_to_limiter[cf_names[cf]] = &ls; + } + option_vector.emplace_back(DBOptions(options), cf_opt); + } + + for (int cf = 1; cf < cf_count; cf++) { + CreateColumnFamilies({cf_names[cf]}, option_vector[cf]); + } + + ReopenWithColumnFamilies(std::vector(cf_names, + cf_names + cf_count), + option_vector); + + port::Mutex mutex; + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:BeforeCompaction", [&](void* arg) { + const auto& cf_name = static_cast(arg)->GetName(); + auto iter = cf_to_limiter.find(cf_name); + if (iter != cf_to_limiter.end()) { + MutexLock l(&mutex); + ASSERT_GE(iter->second->limit_tasks, ++iter->second->tasks); + iter->second->max_tasks = std::max(iter->second->max_tasks, + iter->second->limit_tasks); + } + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:AfterCompaction", [&](void* arg) { + const auto& cf_name = static_cast(arg)->GetName(); + auto iter = cf_to_limiter.find(cf_name); + if (iter != cf_to_limiter.end()) { + MutexLock l(&mutex); + ASSERT_GE(--iter->second->tasks, 0); + } + }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Block all compact threads in thread pool. + const size_t kTotalFlushTasks = kMaxBackgroundThreads / 4; + const size_t kTotalCompactTasks = kMaxBackgroundThreads - kTotalFlushTasks; + env_->SetBackgroundThreads((int)kTotalFlushTasks, Env::HIGH); + env_->SetBackgroundThreads((int)kTotalCompactTasks, Env::LOW); + + test::SleepingBackgroundTask sleeping_compact_tasks[kTotalCompactTasks]; + + // Block all compaction threads in thread pool. + for (size_t i = 0; i < kTotalCompactTasks; i++) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_compact_tasks[i], Env::LOW); + sleeping_compact_tasks[i].WaitUntilSleeping(); + } + + int keyIndex = 0; + + for (int n = 0; n < options.level0_file_num_compaction_trigger; n++) { + for (int cf = 0; cf < cf_count; cf++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf, Key(keyIndex++), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf, "", "")); + } + + for (int cf = 0; cf < cf_count; cf++) { + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + } + } + + // Enough L0 files to trigger compaction + for (int cf = 0; cf < cf_count; cf++) { + ASSERT_EQ(NumTableFilesAtLevel(0, cf), + options.level0_file_num_compaction_trigger); + } + + // Create more files for one column family, which triggers speed up + // condition, all compactions will be scheduled. + for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(0, Key(i), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(0, "", "")); + dbfull()->TEST_WaitForFlushMemTable(handles_[0]); + ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1, + NumTableFilesAtLevel(0, 0)); + } + + // All CFs are pending compaction + ASSERT_EQ(cf_count, env_->GetThreadPoolQueueLen(Env::LOW)); + + // Unblock all compaction threads + for (size_t i = 0; i < kTotalCompactTasks; i++) { + sleeping_compact_tasks[i].WakeUp(); + sleeping_compact_tasks[i].WaitUntilDone(); + } + + for (int cf = 0; cf < cf_count; cf++) { + dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); + } + + dbfull()->TEST_WaitForCompact(); + + // Max outstanding compact tasks reached limit + for (auto& ls : limiter_settings) { + ASSERT_EQ(ls.limit_tasks, ls.max_tasks); + ASSERT_EQ(0, ls.limiter->GetOutstandingTask()); + } + + // test manual compaction under a fully throttled limiter + int cf_test = 1; + unique_limiter->SetMaxOutstandingTask(0); + + // flush one more file to cf 1 + for (int i = 0; i < kNumKeysPerFile; i++) { + ASSERT_OK(Put(cf_test, Key(keyIndex++), "")); + } + // put extra key to trigger flush + ASSERT_OK(Put(cf_test, "", "")); + + dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]); + ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test)); + + Compact(cf_test, Key(0), Key(keyIndex)); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(0, unique_limiter->GetOutstandingTask()); +} + INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, ::testing::Values(std::make_tuple(1, true), std::make_tuple(1, false), diff --git a/db/db_impl.h b/db/db_impl.h index 2cabe756ad..1147b29295 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -65,6 +65,7 @@ class Arena; class ArenaWrappedDBIter; class MemTable; class TableCache; +class TaskLimiterToken; class Version; class VersionEdit; class VersionSet; @@ -1106,9 +1107,18 @@ class DBImpl : public DB { const std::vector& inputs, bool* sfm_bookkeeping, LogBuffer* log_buffer); + // Request compaction tasks token from compaction thread limiter. + // It always succeeds if force = true or limiter is disable. + bool RequestCompactionToken(ColumnFamilyData* cfd, bool force, + std::unique_ptr* token, + LogBuffer* log_buffer); + // Schedule background tasks void StartTimedTasks(); + void SubtractCompactionTask(const std::string& device_name, + LogBuffer* log_buffer); + void PrintStatistics(); // dump rocksdb.stats to LOG @@ -1129,6 +1139,10 @@ class DBImpl : public DB { ColumnFamilyData* PopFirstFromCompactionQueue(); FlushRequest PopFirstFromFlushQueue(); + // Pick the first unthrottled compaction with task token from queue. + ColumnFamilyData* PickCompactionFromQueue( + std::unique_ptr* token, LogBuffer* log_buffer); + // helper function to call after some of the logs_ were synced void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status); @@ -1422,6 +1436,8 @@ class DBImpl : public DB { // caller retains ownership of `manual_compaction_state` as it is reused // across background compactions. ManualCompactionState* manual_compaction_state; // nullptr if non-manual + // task limiter token is requested during compaction picking. + std::unique_ptr task_token; }; std::deque manual_compaction_dequeue_; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 1c32ec6757..c2623161fe 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -20,6 +20,7 @@ #include "monitoring/perf_context_imp.h" #include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_util.h" +#include "util/concurrent_task_limiter_impl.h" #include "util/sst_file_manager_impl.h" #include "util/sync_point.h" @@ -59,6 +60,27 @@ bool DBImpl::EnoughRoomForCompaction( return enough_room; } +bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, + std::unique_ptr* token, + LogBuffer* log_buffer) { + assert(*token == nullptr); + auto limiter = static_cast( + cfd->ioptions()->compaction_thread_limiter.get()); + if (limiter == nullptr) { + return true; + } + *token = limiter->GetToken(force); + if (*token != nullptr) { + ROCKS_LOG_BUFFER(log_buffer, + "Thread limiter [%s] increase [%s] compaction task, " + "force: %s, tasks after: %d", + limiter->GetName().c_str(), cfd->GetName().c_str(), + force ? "true" : "false", limiter->GetOutstandingTask()); + return true; + } + return false; +} + Status DBImpl::SyncClosedLogs(JobContext* job_context) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); mutex_.AssertHeld(); @@ -1354,6 +1376,8 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual compaction starting", cfd->GetName().c_str()); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, + immutable_db_options_.info_log.get()); // We don't check bg_error_ here, because if we get the error in compaction, // the compaction will set manual.status to bg_error_ and set manual.done to // true. @@ -1391,6 +1415,12 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, ca->prepicked_compaction = new PrepickedCompaction; ca->prepicked_compaction->manual_compaction_state = &manual; ca->prepicked_compaction->compaction = compaction; + if (!RequestCompactionToken(cfd, true, + &ca->prepicked_compaction->task_token, + &log_buffer)) { + // Don't throttle manual compaction, only count outstanding tasks. + assert(false); + } manual.incomplete = false; bg_compaction_scheduled_++; env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this, @@ -1399,6 +1429,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, } } + log_buffer.FlushBufferToLog(); assert(!manual.in_progress); assert(HasPendingManualCompaction()); RemoveManualCompaction(&manual); @@ -1824,6 +1855,32 @@ DBImpl::FlushRequest DBImpl::PopFirstFromFlushQueue() { return flush_req; } +ColumnFamilyData* DBImpl::PickCompactionFromQueue( + std::unique_ptr* token, LogBuffer* log_buffer) { + assert(!compaction_queue_.empty()); + assert(*token == nullptr); + autovector throttled_candidates; + ColumnFamilyData* cfd = nullptr; + while (!compaction_queue_.empty()) { + auto first_cfd = *compaction_queue_.begin(); + compaction_queue_.pop_front(); + assert(first_cfd->queued_for_compaction()); + if (!RequestCompactionToken(first_cfd, false, token, log_buffer)) { + throttled_candidates.push_back(first_cfd); + continue; + } + cfd = first_cfd; + cfd->set_queued_for_compaction(false); + break; + } + // Add throttled compaction candidates back to queue in the original order. + for (auto iter = throttled_candidates.rbegin(); + iter != throttled_candidates.rend(); ++iter) { + compaction_queue_.push_front(*iter); + } + return cfd; +} + void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, FlushReason flush_reason) { if (flush_req.empty()) { @@ -2081,7 +2138,12 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer, prepicked_compaction); TEST_SYNC_POINT("BackgroundCallCompaction:1"); - if (!s.ok() && !s.IsShutdownInProgress()) { + if (s.IsBusy()) { + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + mutex_.Unlock(); + env_->SleepForMicroseconds(10000); // prevent hot loop + mutex_.Lock(); + } else if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to // chew up resources for failed compactions for the duration of @@ -2216,6 +2278,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, manual_compaction->in_progress = true; } + std::unique_ptr task_token; + // InternalKey manual_end_storage; // InternalKey* manual_end = &manual_end_storage; bool sfm_reserved_compact_space = false; @@ -2267,17 +2331,23 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, return Status::OK(); } - // cfd is referenced here - auto cfd = PopFirstFromCompactionQueue(); + auto cfd = PickCompactionFromQueue(&task_token, log_buffer); + if (cfd == nullptr) { + // Can't find any executable task from the compaction queue. + // All tasks have been throttled by compaction thread limiter. + ++unscheduled_compactions_; + return Status::Busy(); + } + // We unreference here because the following code will take a Ref() on // this cfd if it is going to use it (Compaction class holds a // reference). // This will all happen under a mutex so we don't have to be afraid of // somebody else deleting it. if (cfd->Unref()) { - delete cfd; // This was the last reference of the column family, so no need to // compact. + delete cfd; return Status::OK(); } @@ -2347,6 +2417,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else if (c->deletion_compaction()) { // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old // file if there is alive snapshot pointing to it + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", + c->column_family_data()); assert(c->num_input_files(1) == 0); assert(c->level() == 0); assert(c->column_family_data()->ioptions()->compaction_style == @@ -2370,8 +2442,12 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, c->column_family_data()->GetName().c_str(), c->num_input_files(0)); *made_progress = true; + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", + c->column_family_data()); } else if (!trivial_move_disallowed && c->IsTrivialMove()) { TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove"); + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", + c->column_family_data()); // Instrument for event update // TODO(yhchiang): add op details for showing trivial-move. ThreadStatusUtil::SetColumnFamily( @@ -2437,6 +2513,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // Clear Instrument ThreadStatusUtil::ResetThreadStatus(); + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", + c->column_family_data()); } else if (!is_prepicked && c->output_level() > 0 && c->output_level() == c->column_family_data() @@ -2454,10 +2532,14 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, ca->prepicked_compaction = new PrepickedCompaction; ca->prepicked_compaction->compaction = c.release(); ca->prepicked_compaction->manual_compaction_state = nullptr; + // Transfer requested token, so it doesn't need to do it again. + ca->prepicked_compaction->task_token = std::move(task_token); ++bg_bottom_compaction_scheduled_; env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM, this, &DBImpl::UnscheduleCallback); } else { + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction", + c->column_family_data()); int output_level __attribute__((__unused__)); output_level = c->output_level(); TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial", @@ -2498,6 +2580,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, *c->mutable_cf_options(), FlushReason::kAutoCompaction); } *made_progress = true; + TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", + c->column_family_data()); } if (c != nullptr) { c->ReleaseCompactionFiles(status); diff --git a/include/rocksdb/concurrent_task_limiter.h b/include/rocksdb/concurrent_task_limiter.h new file mode 100644 index 0000000000..35ca1149c3 --- /dev/null +++ b/include/rocksdb/concurrent_task_limiter.h @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/env.h" +#include "rocksdb/statistics.h" + +namespace rocksdb { + +class ConcurrentTaskLimiter { + public: + + virtual ~ConcurrentTaskLimiter() {} + + // Returns a name that identifies this concurrent task limiter. + virtual const std::string& GetName() const = 0; + + // Set max concurrent tasks. + // limit = 0 means no new task allowed. + // limit < 0 means no limitation. + virtual void SetMaxOutstandingTask(int32_t limit) = 0; + + // Reset to unlimited max concurrent task. + virtual void ResetMaxOutstandingTask() = 0; + + // Returns current outstanding task count. + virtual int32_t GetOutstandingTask() const = 0; +}; + +// Create a ConcurrentTaskLimiter that can be shared with mulitple CFs +// across RocksDB instances to control concurrent tasks. +// +// @param name: Name of the limiter. +// @param limit: max concurrent tasks. +// limit = 0 means no new task allowed. +// limit < 0 means no limitation. +extern ConcurrentTaskLimiter* NewConcurrentTaskLimiter( + const std::string& name, int32_t limit); + +} // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index c3ba448394..dd55e70362 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -34,6 +34,7 @@ class Cache; class CompactionFilter; class CompactionFilterFactory; class Comparator; +class ConcurrentTaskLimiter; class Env; enum InfoLogLevel : unsigned char; class SstFileManager; @@ -293,6 +294,14 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // Default: empty std::vector cf_paths; + // Compaction concurrent thread limiter for the column family. + // If non-nullptr, use given concurrent thread limiter to control + // the max outstanding compaction tasks. Limiter can be shared with + // multiple column families across db instances. + // + // Default: nullptr + std::shared_ptr compaction_thread_limiter = nullptr; + // Create ColumnFamilyOptions with default values for all fields ColumnFamilyOptions(); // Create ColumnFamilyOptions from Options diff --git a/options/cf_options.cc b/options/cf_options.cc index 37ef710650..74a6c47e6e 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -17,6 +17,7 @@ #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/options.h" +#include "rocksdb/concurrent_task_limiter.h" namespace rocksdb { @@ -75,7 +76,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, max_subcompactions(db_options.max_subcompactions), memtable_insert_with_hint_prefix_extractor( cf_options.memtable_insert_with_hint_prefix_extractor.get()), - cf_paths(cf_options.cf_paths) {} + cf_paths(cf_options.cf_paths), + compaction_thread_limiter(cf_options.compaction_thread_limiter) {} // Multiple two operands. If they overflow, return op1. uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) { diff --git a/options/cf_options.h b/options/cf_options.h index 69b0b0105a..0cc5ef5a53 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -120,6 +120,8 @@ struct ImmutableCFOptions { const SliceTransform* memtable_insert_with_hint_prefix_extractor; std::vector cf_paths; + + std::shared_ptr compaction_thread_limiter; }; struct MutableCFOptions { diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index cad1af3d76..9ff7c7814d 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -351,6 +351,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(std::shared_ptr)}, {offset_of(&ColumnFamilyOptions::cf_paths), sizeof(std::vector)}, + {offset_of(&ColumnFamilyOptions::compaction_thread_limiter), + sizeof(std::shared_ptr)}, }; char* options_ptr = new char[sizeof(ColumnFamilyOptions)]; @@ -389,6 +391,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { options->soft_rate_limit = 0; options->purge_redundant_kvs_while_flush = false; options->max_mem_compaction_level = 0; + options->compaction_filter = nullptr; char* new_options_ptr = new char[sizeof(ColumnFamilyOptions)]; ColumnFamilyOptions* new_options = diff --git a/src.mk b/src.mk index 5ba7f4b7c0..0926fc12c8 100644 --- a/src.mk +++ b/src.mk @@ -137,6 +137,7 @@ LIB_SOURCES = \ util/comparator.cc \ util/compression_context_cache.cc \ util/concurrent_arena.cc \ + util/concurrent_task_limiter_impl.cc \ util/crc32c.cc \ util/delete_scheduler.cc \ util/dynamic_bloom.cc \ diff --git a/util/concurrent_task_limiter_impl.cc b/util/concurrent_task_limiter_impl.cc new file mode 100644 index 0000000000..098028b195 --- /dev/null +++ b/util/concurrent_task_limiter_impl.cc @@ -0,0 +1,66 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "util/concurrent_task_limiter_impl.h" +#include "rocksdb/concurrent_task_limiter.h" + +namespace rocksdb { + +ConcurrentTaskLimiterImpl::ConcurrentTaskLimiterImpl( + const std::string& name, int32_t max_outstanding_task) + : name_(name), + max_outstanding_tasks_{max_outstanding_task}, + outstanding_tasks_{0} { + +} + +ConcurrentTaskLimiterImpl::~ConcurrentTaskLimiterImpl() { +} + +const std::string& ConcurrentTaskLimiterImpl::GetName() const { + return name_; +} + +void ConcurrentTaskLimiterImpl::SetMaxOutstandingTask(int32_t limit) { + max_outstanding_tasks_.store(limit, std::memory_order_relaxed); +} + +void ConcurrentTaskLimiterImpl::ResetMaxOutstandingTask() { + max_outstanding_tasks_.store(-1, std::memory_order_relaxed); +} + +int32_t ConcurrentTaskLimiterImpl::GetOutstandingTask() const { + return outstanding_tasks_.load(std::memory_order_relaxed); +} + +std::unique_ptr ConcurrentTaskLimiterImpl::GetToken( + bool force) { + int32_t limit = max_outstanding_tasks_.load(std::memory_order_relaxed); + int32_t tasks = outstanding_tasks_.load(std::memory_order_relaxed); + // force = true, bypass the throttle. + // limit < 0 means unlimited tasks. + while (force || limit < 0 || tasks < limit) { + if (outstanding_tasks_.compare_exchange_weak(tasks, tasks + 1)) { + return std::unique_ptr(new TaskLimiterToken(this)); + } + } + return nullptr; +} + +ConcurrentTaskLimiter* NewConcurrentTaskLimiter( + const std::string& name, int32_t limit) { + return new ConcurrentTaskLimiterImpl(name, limit); +} + +TaskLimiterToken::~TaskLimiterToken() { + --limiter_->outstanding_tasks_; + assert(limiter_->outstanding_tasks_ >= 0); +} + +} // namespace rocksdb diff --git a/util/concurrent_task_limiter_impl.h b/util/concurrent_task_limiter_impl.h new file mode 100644 index 0000000000..515f1481e0 --- /dev/null +++ b/util/concurrent_task_limiter_impl.h @@ -0,0 +1,68 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include + +#include "rocksdb/env.h" +#include "rocksdb/concurrent_task_limiter.h" + +namespace rocksdb { + +class TaskLimiterToken; + +class ConcurrentTaskLimiterImpl : public ConcurrentTaskLimiter { + public: + explicit ConcurrentTaskLimiterImpl(const std::string& name, + int32_t max_outstanding_task); + + virtual ~ConcurrentTaskLimiterImpl(); + + virtual const std::string& GetName() const override; + + virtual void SetMaxOutstandingTask(int32_t limit) override; + + virtual void ResetMaxOutstandingTask() override; + + virtual int32_t GetOutstandingTask() const override; + + // Request token for adding a new task. + // If force == true, it requests a token bypassing throttle. + // Returns nullptr if it got throttled. + virtual std::unique_ptr GetToken(bool force); + + private: + friend class TaskLimiterToken; + + std::string name_; + std::atomic max_outstanding_tasks_; + std::atomic outstanding_tasks_; + + // No copying allowed + ConcurrentTaskLimiterImpl(const ConcurrentTaskLimiterImpl&) = delete; + ConcurrentTaskLimiterImpl& operator=( + const ConcurrentTaskLimiterImpl&) = delete; +}; + +class TaskLimiterToken { + public: + explicit TaskLimiterToken(ConcurrentTaskLimiterImpl* limiter) + : limiter_(limiter) {} + ~TaskLimiterToken(); + + private: + ConcurrentTaskLimiterImpl* limiter_; + + // no copying allowed + TaskLimiterToken(const TaskLimiterToken&) = delete; + void operator=(const TaskLimiterToken&) = delete; +}; + +} // namespace rocksdb