From 96206531bc0bb56d87012921c5458c8a3047a6b3 Mon Sep 17 00:00:00 2001 From: zczhu <> Date: Fri, 8 Jul 2022 19:48:09 -0700 Subject: [PATCH] Support reservation in thread pool (#10278) Summary: Add `ReserveThreads` and `ReleaseThreads` functions in thread pool to support reservation in for a specific thread pool. With this feature, a thread will be blocked if the number of waiting threads (noted by `num_waiting_threads_`) equals the number of reserved threads (noted by `reserved_threads_`), normally `reserved_threads_` is upper bounded by `num_waiting_threads_`; in rare cases (e.g. `SetBackgroundThreadsInternal` is called when some threads are already reserved), `num_waiting_threads_` can be less than `reserved_threads`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10278 Test Plan: Add `ReserveThreads` unit test in `env_test`. Update the unit test `SimpleColumnFamilyInfoTest` in `thread_list_test` with adding `ReserveThreads` related assertions. Reviewed By: hx235 Differential Revision: D37640946 Pulled By: littlepig2013 fbshipit-source-id: 4d691f6b9a433569f96ab52d52c3defe5b065367 --- HISTORY.md | 1 + env/composite_env_wrapper.h | 8 +++ env/env_posix.cc | 14 ++++ env/env_test.cc | 131 ++++++++++++++++++++++++++++++++++- include/rocksdb/env.h | 20 ++++++ include/rocksdb/threadpool.h | 9 +++ port/win/env_win.cc | 17 +++++ port/win/env_win.h | 8 +++ util/thread_list_test.cc | 19 ++++- util/threadpool_imp.cc | 71 +++++++++++++++++-- util/threadpool_imp.h | 8 +++ 11 files changed, 300 insertions(+), 6 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index d69c195b7d..385f821f32 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,7 @@ * `rocksdb_file_metadata_t` and its and get functions & destroy functions. * Add suggest_compact_range() and suggest_compact_range_cf() to C API. * When using block cache strict capacity limit (`LRUCache` with `strict_capacity_limit=true`), DB operations now fail with Status code `kAborted` subcode `kMemoryLimit` (`IsMemoryLimit()`) instead of `kIncomplete` (`IsIncomplete()`) when the capacity limit is reached, because Incomplete can mean other specific things for some operations. In more detail, `Cache::Insert()` now returns the updated Status code and this usually propagates through RocksDB to the user on failure. +* Add two functions `int ReserveThreads(int threads_to_be_reserved)` and `int ReleaseThreads(threads_to_be_released)` into `Env` class. In the default implementation, both return 0. Newly added `xxxEnv` class that inherits `Env` should implement these two functions for thread reservation/releasing features. ### Bug Fixes * Fix a bug in which backup/checkpoint can include a WAL deleted by RocksDB. diff --git a/env/composite_env_wrapper.h b/env/composite_env_wrapper.h index d842fcf071..78da6f0ed6 100644 --- a/env/composite_env_wrapper.h +++ b/env/composite_env_wrapper.h @@ -322,6 +322,14 @@ class CompositeEnvWrapper : public CompositeEnv { return target_.env->GetThreadPoolQueueLen(pri); } + int ReserveThreads(int threads_to_be_reserved, Priority pri) override { + return target_.env->ReserveThreads(threads_to_be_reserved, pri); + } + + int ReleaseThreads(int threads_to_be_released, Priority pri) override { + return target_.env->ReleaseThreads(threads_to_be_released, pri); + } + Status GetHostName(char* name, uint64_t len) override { return target_.env->GetHostName(name, len); } diff --git a/env/env_posix.cc b/env/env_posix.cc index 5698fcdd72..1e648bc947 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -302,6 +302,10 @@ class PosixEnv : public CompositeEnv { unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; + int ReserveThreads(int threads_to_be_reserved, Priority pri) override; + + int ReleaseThreads(int threads_to_be_released, Priority pri) override; + Status GetThreadList(std::vector* thread_list) override { assert(thread_status_updater_); return thread_status_updater_->GetThreadList(thread_list); @@ -437,6 +441,16 @@ unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { return thread_pools_[pri].GetQueueLen(); } +int PosixEnv::ReserveThreads(int threads_to_reserved, Priority pri) { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].ReserveThreads(threads_to_reserved); +} + +int PosixEnv::ReleaseThreads(int threads_to_released, Priority pri) { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].ReleaseThreads(threads_to_released); +} + struct StartThreadState { void (*user_function)(void*); void* arg; diff --git a/env/env_test.cc b/env/env_test.cc index 16741cf9e8..91e29627f8 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -839,6 +839,135 @@ TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { WaitThreadPoolsEmpty(); } +TEST_P(EnvPosixTestWithParam, ReserveThreads) { + // Initialize the background thread to 1 in case other threads exist + // from the last unit test + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + ASSERT_EQ(env_->GetBackgroundThreads(Env::HIGH), 1); + constexpr int kWaitMicros = 10000000; // 10seconds + std::vector tasks(4); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + // Set the sync point to ensure thread 0 can terminate + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"ThreadPoolImpl::BGThread::Termination:th0", + "EnvTest::ReserveThreads:0"}}); + // Empty the thread pool to ensure all the threads can start later + env_->SetBackgroundThreads(0, Env::Priority::HIGH); + TEST_SYNC_POINT("EnvTest::ReserveThreads:0"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + // Set the sync point to ensure threads start and pass the sync point + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"ThreadPoolImpl::BGThread::Start:th0", "EnvTest::ReserveThreads:1"}, + {"ThreadPoolImpl::BGThread::Start:th1", "EnvTest::ReserveThreads:2"}, + {"ThreadPoolImpl::BGThread::Start:th2", "EnvTest::ReserveThreads:3"}, + {"ThreadPoolImpl::BGThread::Start:th3", "EnvTest::ReserveThreads:4"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Set number of thread to 3 first. + env_->SetBackgroundThreads(3, Env::Priority::HIGH); + ASSERT_EQ(env_->GetBackgroundThreads(Env::HIGH), 3); + // Add sync points to ensure all 3 threads start + TEST_SYNC_POINT("EnvTest::ReserveThreads:1"); + TEST_SYNC_POINT("EnvTest::ReserveThreads:2"); + TEST_SYNC_POINT("EnvTest::ReserveThreads:3"); + // Reserve 2 threads + ASSERT_EQ(2, env_->ReserveThreads(2, Env::Priority::HIGH)); + + // Schedule 3 tasks. Task 0 running (in this context, doing + // SleepingBackgroundTask); Task 1, 2 waiting; 3 reserved threads. + for (size_t i = 0; i < 3; i++) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], + Env::Priority::HIGH); + } + ASSERT_FALSE(tasks[0].TimedWaitUntilSleeping(kWaitMicros)); + ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(!tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Release 2 threads. Task 0, 1, 2 running; 0 reserved thread. + ASSERT_EQ(2, env_->ReleaseThreads(2, Env::Priority::HIGH)); + ASSERT_FALSE(tasks[1].TimedWaitUntilSleeping(kWaitMicros)); + ASSERT_FALSE(tasks[2].TimedWaitUntilSleeping(kWaitMicros)); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[1].IsSleeping()); + ASSERT_TRUE(tasks[2].IsSleeping()); + // No more threads can be reserved + ASSERT_EQ(0, env_->ReserveThreads(3, Env::Priority::HIGH)); + // Expand the number of background threads so that the last thread + // is waiting + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + // Add sync point to ensure the 4th thread starts + TEST_SYNC_POINT("EnvTest::ReserveThreads:4"); + // As the thread pool is expanded, we can reserve one more thread + ASSERT_EQ(1, env_->ReserveThreads(3, Env::Priority::HIGH)); + // No more threads can be reserved + ASSERT_EQ(0, env_->ReserveThreads(3, Env::Priority::HIGH)); + + // Reset the sync points for the next iteration in BGThread or the + // next time Submit() is called + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"ThreadPoolImpl::BGThread::WaitingThreadsInc", + "EnvTest::ReserveThreads:5"}, + {"ThreadPoolImpl::BGThread::Termination", "EnvTest::ReserveThreads:6"}, + {"ThreadPoolImpl::Submit::Enqueue", "EnvTest::ReserveThreads:7"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + tasks[0].WakeUp(); + ASSERT_FALSE(tasks[0].TimedWaitUntilDone(kWaitMicros)); + // Add sync point to ensure the number of waiting threads increases + TEST_SYNC_POINT("EnvTest::ReserveThreads:5"); + // 1 more thread can be reserved + ASSERT_EQ(1, env_->ReserveThreads(3, Env::Priority::HIGH)); + // 2 reserved threads now + + // Currently, two threads are blocked since the number of waiting + // threads is equal to the number of reserved threads (i.e., 2). + // If we reduce the number of background thread to 1, at least one thread + // will be the last excessive thread (here we have no control over the + // number of excessive threads because thread order does not + // necessarily follows the schedule order, but we ensure that the last thread + // shall not run any task by expanding the thread pool after we schedule + // the tasks), and thus they(it) become(s) unblocked, the number of waiting + // threads decreases to 0 or 1, but the number of reserved threads is still 2 + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + + // Task 1,2 running; 2 reserved threads, however, in fact, we only have + // 0 or 1 waiting thread in the thread pool, proved by the + // following test, we CANNOT reserve 2 threads even though we just + // release 2 + TEST_SYNC_POINT("EnvTest::ReserveThreads:6"); + ASSERT_EQ(2, env_->ReleaseThreads(2, Env::Priority::HIGH)); + ASSERT_GT(2, env_->ReserveThreads(2, Env::Priority::HIGH)); + + // Every new task will be put into the queue at this point + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[3], + Env::Priority::HIGH); + TEST_SYNC_POINT("EnvTest::ReserveThreads:7"); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(!tasks[3].IsSleeping()); + + // Set the number of threads to 3 so that Task 3 can dequeue + env_->SetBackgroundThreads(3, Env::Priority::HIGH); + // Wakup Task 1 + tasks[1].WakeUp(); + ASSERT_FALSE(tasks[1].TimedWaitUntilDone(kWaitMicros)); + // Task 2, 3 running (Task 3 dequeue); 0 or 1 reserved thread + ASSERT_FALSE(tasks[3].TimedWaitUntilSleeping(kWaitMicros)); + ASSERT_TRUE(tasks[3].IsSleeping()); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + // At most 1 thread can be released + ASSERT_GT(2, env_->ReleaseThreads(3, Env::Priority::HIGH)); + tasks[2].WakeUp(); + ASSERT_FALSE(tasks[2].TimedWaitUntilDone(kWaitMicros)); + tasks[3].WakeUp(); + ASSERT_FALSE(tasks[3].TimedWaitUntilDone(kWaitMicros)); + WaitThreadPoolsEmpty(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + #if (defined OS_LINUX || defined OS_WIN) // Travis doesn't support fallocate or getting unique ID from files for whatever // reason. @@ -1271,8 +1400,8 @@ TEST_P(EnvPosixTestWithParam, MultiRead) { } } }); - ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); std::unique_ptr file; std::vector reqs(3); std::vector> data; diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 52b0852d23..bef60a2124 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -493,6 +493,17 @@ class Env : public Customizable { // Wait for all threads started by StartThread to terminate. virtual void WaitForJoin() {} + // Reserve available background threads in the specified thread pool. + virtual int ReserveThreads(int /*threads_to_be_reserved*/, Priority /*pri*/) { + return 0; + } + + // Release a specific number of reserved threads from the specified thread + // pool + virtual int ReleaseThreads(int /*threads_to_be_released*/, Priority /*pri*/) { + return 0; + } + // Get thread pool queue length for specific thread pool. virtual unsigned int GetThreadPoolQueueLen(Priority /*pri*/ = LOW) const { return 0; @@ -1533,6 +1544,15 @@ class EnvWrapper : public Env { unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override { return target_.env->GetThreadPoolQueueLen(pri); } + + int ReserveThreads(int threads_to_be_reserved, Priority pri) override { + return target_.env->ReserveThreads(threads_to_be_reserved, pri); + } + + int ReleaseThreads(int threads_to_be_released, Priority pri) override { + return target_.env->ReleaseThreads(threads_to_be_released, pri); + } + Status GetTestDirectory(std::string* path) override { return target_.env->GetTestDirectory(path); } diff --git a/include/rocksdb/threadpool.h b/include/rocksdb/threadpool.h index b39321fe8f..f1cc557524 100644 --- a/include/rocksdb/threadpool.h +++ b/include/rocksdb/threadpool.h @@ -49,6 +49,15 @@ class ThreadPool { virtual void SubmitJob(const std::function&) = 0; // This moves the function in for efficiency virtual void SubmitJob(std::function&&) = 0; + + // Reserve available background threads. This function does not ensure + // so many threads can be reserved, instead it will return the number of + // threads that can be reserved against the desired one. In other words, + // the number of available threads could be less than the input. + virtual int ReserveThreads(int /*threads_to_be_reserved*/) { return 0; } + + // Release a specific number of reserved threads + virtual int ReleaseThreads(int /*threads_to_be_released*/) { return 0; } }; // NewThreadPool() is a function that could be used to create a ThreadPool diff --git a/port/win/env_win.cc b/port/win/env_win.cc index f856d8ccbc..2262eb59c4 100644 --- a/port/win/env_win.cc +++ b/port/win/env_win.cc @@ -1322,6 +1322,16 @@ unsigned int WinEnvThreads::GetThreadPoolQueueLen(Env::Priority pri) const { return thread_pools_[pri].GetQueueLen(); } +int WinEnvThreads::ReserveThreads(int threads_to_reserved, Env::Priority pri) { + assert(pri >= Env::Priority::BOTTOM && pri <= Env::Priority::HIGH); + return thread_pools_[pri].ReserveThreads(threads_to_reserved); +} + +int WinEnvThreads::ReleaseThreads(int threads_to_released, Env::Priority pri) { + assert(pri >= Env::Priority::BOTTOM && pri <= Env::Priority::HIGH); + return thread_pools_[pri].ReleaseThreads(threads_to_released); +} + uint64_t WinEnvThreads::gettid() { uint64_t thread_id = GetCurrentThreadId(); return thread_id; @@ -1388,6 +1398,13 @@ void WinEnv::WaitForJoin() { return winenv_threads_.WaitForJoin(); } unsigned int WinEnv::GetThreadPoolQueueLen(Env::Priority pri) const { return winenv_threads_.GetThreadPoolQueueLen(pri); } +int WinEnv::ReserveThreads(int threads_to_reserved, Env::Priority pri) { + return winenv_threads_.ReserveThreads(threads_to_reserved, pri); +} + +int WinEnv::ReleaseThreads(int threads_to_released, Env::Priority pri) { + return winenv_threads_.ReleaseThreads(threads_to_released, pri); +} uint64_t WinEnv::GetThreadID() const { return winenv_threads_.GetThreadID(); } diff --git a/port/win/env_win.h b/port/win/env_win.h index 6771d8cda3..8fbfb8246c 100644 --- a/port/win/env_win.h +++ b/port/win/env_win.h @@ -57,6 +57,10 @@ class WinEnvThreads { unsigned int GetThreadPoolQueueLen(Env::Priority pri) const; + int ReserveThreads(int threads_to_be_reserved, Env::Priority pri); + + int ReleaseThreads(int threads_to_be_released, Env::Priority pri); + static uint64_t gettid(); uint64_t GetThreadID() const; @@ -279,6 +283,10 @@ class WinEnv : public CompositeEnv { unsigned int GetThreadPoolQueueLen(Env::Priority pri) const override; + int ReserveThreads(int threads_to_be_reserved, Env::Priority pri) override; + + int ReleaseThreads(int threads_to_be_released, Env::Priority pri) override; + uint64_t GetThreadID() const override; // Allow increasing the number of worker threads. diff --git a/util/thread_list_test.cc b/util/thread_list_test.cc index 8ce31909fc..65da2edeb9 100644 --- a/util/thread_list_test.cc +++ b/util/thread_list_test.cc @@ -126,9 +126,11 @@ TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) { const int kLowPriorityThreads = 5; const int kSimulatedHighPriThreads = kHighPriorityThreads - 1; const int kSimulatedLowPriThreads = kLowPriorityThreads / 3; + const int kDelayMicros = 1000000; env->SetBackgroundThreads(kHighPriorityThreads, Env::HIGH); env->SetBackgroundThreads(kLowPriorityThreads, Env::LOW); - + // Wait 1 second so that threads start + Env::Default()->SleepForMicroseconds(kDelayMicros); SimulatedBackgroundTask running_task( reinterpret_cast(1234), "running", reinterpret_cast(5678), "pikachu"); @@ -137,13 +139,20 @@ TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) { env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, &running_task, Env::Priority::HIGH); } + for (int test = 0; test < kSimulatedLowPriThreads; ++test) { env->Schedule(&SimulatedBackgroundTask::DoSimulatedTask, &running_task, Env::Priority::LOW); } running_task.WaitUntilScheduled(kSimulatedHighPriThreads + kSimulatedLowPriThreads); + // We can only reserve limited number of waiting threads + ASSERT_EQ(kHighPriorityThreads - kSimulatedHighPriThreads, + env->ReserveThreads(kHighPriorityThreads, Env::Priority::HIGH)); + ASSERT_EQ(kLowPriorityThreads - kSimulatedLowPriThreads, + env->ReserveThreads(kLowPriorityThreads, Env::Priority::LOW)); + // Reservation shall not affect the existing thread list std::vector thread_list; // Verify the number of running threads in each pool. @@ -155,6 +164,10 @@ TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) { running_count[thread_status.thread_type]++; } } + // Cannot reserve more threads + ASSERT_EQ(0, env->ReserveThreads(kHighPriorityThreads, Env::Priority::HIGH)); + ASSERT_EQ(0, env->ReserveThreads(kLowPriorityThreads, Env::Priority::LOW)); + ASSERT_EQ( running_count[ThreadStatus::HIGH_PRIORITY], kSimulatedHighPriThreads); @@ -167,6 +180,10 @@ TEST_F(ThreadListTest, SimpleColumnFamilyInfoTest) { running_task.FinishAllTasks(); running_task.WaitUntilDone(); + ASSERT_EQ(kHighPriorityThreads - kSimulatedHighPriThreads, + env->ReleaseThreads(kHighPriorityThreads, Env::Priority::HIGH)); + ASSERT_EQ(kLowPriorityThreads - kSimulatedLowPriThreads, + env->ReleaseThreads(kLowPriorityThreads, Env::Priority::LOW)); // Verify none of the threads are running ASSERT_OK(env->GetThreadList(&thread_list)); diff --git a/util/threadpool_imp.cc b/util/threadpool_imp.cc index dc166e13cf..e6d88213f2 100644 --- a/util/threadpool_imp.cc +++ b/util/threadpool_imp.cc @@ -100,6 +100,30 @@ struct ThreadPoolImpl::Impl { // Set the thread priority. void SetThreadPriority(Env::Priority priority) { priority_ = priority; } + int ReserveThreads(int threads_to_be_reserved) { + std::unique_lock lock(mu_); + // We can reserve at most num_waiting_threads_ in total so the number of + // threads that can be reserved might be fewer than the desired one. In + // rare cases, num_waiting_threads_ could be less than reserved_threads + // due to SetBackgroundThreadInternal or last excessive threads. If that + // happens, we cannot reserve any other threads. + int reserved_threads_in_success = + std::min(std::max(num_waiting_threads_ - reserved_threads_, 0), + threads_to_be_reserved); + reserved_threads_ += reserved_threads_in_success; + return reserved_threads_in_success; + } + + int ReleaseThreads(int threads_to_be_released) { + std::unique_lock lock(mu_); + // We cannot release more than reserved_threads_ + int released_threads_in_success = + std::min(reserved_threads_, threads_to_be_released); + reserved_threads_ -= released_threads_in_success; + WakeUpAllThreads(); + return released_threads_in_success; + } + private: static void BGThreadWrapper(void* arg); @@ -110,6 +134,16 @@ private: int total_threads_limit_; std::atomic_uint queue_len_; // Queue length. Used for stats reporting + // Number of reserved threads, managed by ReserveThreads(..) and + // ReleaseThreads(..), if num_waiting_threads_ is no larger than + // reserved_threads_, its thread will be blocked to ensure the reservation + // mechanism + int reserved_threads_; + // Number of waiting threads (Maximum number of threads that can be + // reserved), in rare cases, num_waiting_threads_ could be less than + // reserved_threads due to SetBackgroundThreadInternal or last + // excessive threads. + int num_waiting_threads_; bool exit_all_threads_; bool wait_for_jobs_to_complete_; @@ -135,6 +169,8 @@ inline ThreadPoolImpl::Impl::Impl() env_(nullptr), total_threads_limit_(0), queue_len_(), + reserved_threads_(0), + num_waiting_threads_(0), exit_all_threads_(false), wait_for_jobs_to_complete_(false), queue_(), @@ -155,6 +191,8 @@ void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) { // prevent threads from being recreated right after they're joined, in case // the user is concurrently submitting jobs. total_threads_limit_ = 0; + reserved_threads_ = 0; + num_waiting_threads_ = 0; lock.unlock(); @@ -189,10 +227,23 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { // Wait until there is an item that is ready to run std::unique_lock lock(mu_); // Stop waiting if the thread needs to do work or needs to terminate. + // Increase num_waiting_threads_ once this task has started waiting + num_waiting_threads_++; + + TEST_SYNC_POINT("ThreadPoolImpl::BGThread::WaitingThreadsInc"); + TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Start:th", thread_id); + // When not exist_all_threads and the current thread id is not the last + // excessive thread, it may be blocked due to 3 reasons: 1) queue is empty + // 2) it is the excessive thread (not the last one) + // 3) the number of waiting threads is not greater than reserved threads + // (i.e, no available threads due to full reservation") while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && - (queue_.empty() || IsExcessiveThread(thread_id))) { + (queue_.empty() || IsExcessiveThread(thread_id) || + num_waiting_threads_ <= reserved_threads_)) { bgsignal_.wait(lock); } + // Decrease num_waiting_threads_ once the thread is not waiting + num_waiting_threads_--; if (exit_all_threads_) { // mechanism to let BG threads exit safely @@ -209,11 +260,13 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { auto& terminating_thread = bgthreads_.back(); terminating_thread.detach(); bgthreads_.pop_back(); - if (HasExcessiveThread()) { // There is still at least more excessive thread to terminate. WakeUpAllThreads(); } + TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Termination:th", + thread_id); + TEST_SYNC_POINT("ThreadPoolImpl::BGThread::Termination"); break; } @@ -333,7 +386,6 @@ int ThreadPoolImpl::Impl::GetBackgroundThreads() { void ThreadPoolImpl::Impl::StartBGThreads() { // Start background thread if necessary while ((int)bgthreads_.size() < total_threads_limit_) { - port::Thread p_t(&BGThreadWrapper, new BGThreadMetadata(this, bgthreads_.size())); @@ -367,7 +419,7 @@ void ThreadPoolImpl::Impl::Submit(std::function&& schedule, // Add to priority queue queue_.push_back(BGItem()); - + TEST_SYNC_POINT("ThreadPoolImpl::Submit::Enqueue"); auto& item = queue_.back(); item.tag = tag; item.function = std::move(schedule); @@ -498,6 +550,17 @@ void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) { impl_->SetThreadPriority(priority); } +// Reserve a specific number of threads, prevent them from running other +// functions The number of reserved threads could be fewer than the desired one +int ThreadPoolImpl::ReserveThreads(int threads_to_be_reserved) { + return impl_->ReserveThreads(threads_to_be_reserved); +} + +// Release a specific number of threads +int ThreadPoolImpl::ReleaseThreads(int threads_to_be_released) { + return impl_->ReleaseThreads(threads_to_be_released); +} + ThreadPool* NewThreadPool(int num_threads) { ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); thread_pool->SetBackgroundThreads(num_threads); diff --git a/util/threadpool_imp.h b/util/threadpool_imp.h index 0bf0824e4f..e072ae8d30 100644 --- a/util/threadpool_imp.h +++ b/util/threadpool_imp.h @@ -88,6 +88,14 @@ class ThreadPoolImpl : public ThreadPool { // Set the thread priority. void SetThreadPriority(Env::Priority priority); + // Reserve a specific number of threads, prevent them from running other + // functions The number of reserved threads could be fewer than the desired + // one + int ReserveThreads(int threads_to_be_reserved) override; + + // Release a specific number of threads + int ReleaseThreads(int threads_to_be_released) override; + static void PthreadCall(const char* label, int result); struct Impl;