From 82b81dc8b5e5370d1c77c860da9b518b5a7681ad Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Mon, 9 Aug 2021 16:46:14 -0700 Subject: [PATCH] Simplify GenericRateLimiter algorithm (#8602) Summary: `GenericRateLimiter` slow path handles requests that cannot be satisfied immediately. Such requests enter a queue, and their thread stays in `Request()` until they are granted or the rate limiter is stopped. These threads are responsible for unblocking themselves. The work to do so is split into two main duties. (1) Waiting for the next refill time. (2) Refilling the bytes and granting requests. Prior to this PR, the slow path logic involved a leader election algorithm to pick one thread to perform (1) followed by (2). It elected the thread whose request was at the front of the highest priority non-empty queue since that request was most likely to be granted. This algorithm was efficient in terms of reducing intermediate wakeups, which is a thread waking up only to resume waiting after finding its request is not granted. However, the conceptual complexity of this algorithm was too high. It took me a long time to draw a timeline to understand how it works for just one edge case yet there were so many. This PR drops the leader election to reduce conceptual complexity. Now, the two duties can be performed by whichever thread acquires the lock first. The risk of this change is increasing the number of intermediate wakeups, however, we took steps to mitigate that. - `wait_until_refill_pending_` flag ensures only one thread performs (1). This\ prevents the thundering herd problem at the next refill time. The remaining\ threads wait on their condition variable with an unbounded duration -- thus we\ must remember to notify them to ensure forward progress. - (1) is typically done by a thread at the front of a queue. This is trivial\ when the queues are initially empty as the first choice that arrives must be\ the only entry in its queue. When queues are initially non-empty, we achieve\ this by having (2) notify a thread at the front of a queue (preferring higher\ priority) to perform the next duty. - We do not require any additional wakeup for (2). Typically it will just be\ done by the thread that finished (1). Combined, the second and third bullet points above suggest the refill/granting will typically be done by a request at the front of its queue. This is important because one wakeup is saved when a granted request happens to be in an already running thread. Note there are a few cases that still lead to intermediate wakeup, however. The first two are existing issues that also apply to the old algorithm, however, the third (including both subpoints) is new. - No request may be granted (only possible when rate limit dynamically\ decreases). - Requests from a different queue may be granted. - (2) may be run by a non-front request thread causing it to not be granted even\ if some requests in that same queue are granted. It can happen for a couple\ (unlikely) reasons. - A new request may sneak in and grab the lock at the refill time, before the\ thread finishing (1) can wake up and grab it. - A new request may sneak in and grab the lock and execute (1) before (2)'s\ chosen candidate can wake up and grab the lock. Then that non-front request\ thread performing (1) can carry over to perform (2). Pull Request resolved: https://github.com/facebook/rocksdb/pull/8602 Test Plan: - Use existing tests. The edge cases listed in the comment are all performance\ related; I could not really think of any related to correctness. The logic\ looks the same whether a thread wakes up/finishes its work early/on-time/late,\ or whether the thread is chosen vs. "steals" the work. - Verified write throughput and CPU overhead are basically the same with and\ without this change, even in a rate limiter heavy workload: Test command: ``` $ rm -rf /dev/shm/dbbench/ && TEST_TMPDIR=/dev/shm /usr/bin/time ./db_bench -benchmarks=fillrandom -num_multi_db=64 -num_low_pri_threads=64 -num_high_pri_threads=64 -write_buffer_size=262144 -target_file_size_base=262144 -max_bytes_for_level_base=1048576 -rate_limiter_bytes_per_sec=16777216 -key_size=24 -value_size=1000 -num=10000 -compression_type=none -rate_limiter_refill_period_us=1000 ``` Results before this PR: ``` fillrandom : 108.463 micros/op 9219 ops/sec; 9.0 MB/s 7.40user 8.84system 1:26.20elapsed 18%CPU (0avgtext+0avgdata 256140maxresident)k ``` Results after this PR: ``` fillrandom : 108.108 micros/op 9250 ops/sec; 9.0 MB/s 7.45user 8.23system 1:26.68elapsed 18%CPU (0avgtext+0avgdata 255688maxresident)k ``` Reviewed By: hx235 Differential Revision: D30048013 Pulled By: ajkr fbshipit-source-id: 6741bba9d9dfbccab359806d725105817fef818b --- tools/db_bench_tool.cc | 6 +- util/rate_limiter.cc | 196 +++++++++++--------------------------- util/rate_limiter.h | 2 +- util/rate_limiter_test.cc | 14 ++- 4 files changed, 70 insertions(+), 148 deletions(-) diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index bb69878a15..17be134cb7 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1217,6 +1217,10 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); +DEFINE_int64(rate_limiter_refill_period_us, 100 * 1000, + "Set refill period on " + "rate limiter."); + DEFINE_bool(rate_limiter_auto_tuned, false, "Enable dynamic adjustment of rate limit according to demand for " "background I/O"); @@ -4443,7 +4447,7 @@ class Benchmark { exit(1); } options.rate_limiter.reset(NewGenericRateLimiter( - FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */, + FLAGS_rate_limiter_bytes_per_sec, FLAGS_rate_limiter_refill_period_us, 10 /* fairness */, FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly : RateLimiter::Mode::kWritesOnly, diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 70bdac0265..a98400a9b4 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -62,7 +62,7 @@ GenericRateLimiter::GenericRateLimiter( next_refill_us_(NowMicrosMonotonic()), fairness_(fairness > 100 ? 100 : fairness), rnd_((uint32_t)time(nullptr)), - leader_(nullptr), + wait_until_refill_pending_(false), auto_tuned_(auto_tuned), num_drains_(0), prev_num_drains_(0), @@ -139,148 +139,70 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, Req r(bytes, &request_mutex_); queue_[pri].push_back(&r); + // A thread representing a queued request coordinates with other such threads. + // There are two main duties. + // + // (1) Waiting for the next refill time. + // (2) Refilling the bytes and granting requests. do { - bool timedout = false; - - // Leader election: - // Leader request's duty: - // (1) Waiting for the next refill time; - // (2) Refilling the bytes and granting requests. - // - // If the following three conditions are all true for a request, - // then the request is selected as a leader: - // (1) The request thread acquired the request_mutex_ and is running; - // (2) There is currently no leader; - // (3) The request sits at the front of a queue. - // - // If not selected as a leader, the request thread will wait - // for one of the following signals to wake up and - // compete for the request_mutex_: - // (1) Signal from the previous leader to exit since its requested bytes - // are fully granted; - // (2) Signal from the previous leader to particpate in next-round - // leader election; - // (3) Signal from rate limiter's destructor as part of the clean-up. - // - // Therefore, a leader request can only be one of the following types: - // (1) a new incoming request placed at the front of a queue; - // (2) a previous leader request whose quota has not been not fully - // granted yet due to its lower priority, hence still at - // the front of a queue; - // (3) a waiting request at the front of a queue, which got - // signaled by the previous leader to participate in leader election. - if (leader_ == nullptr && - ((!queue_[Env::IO_HIGH].empty() && - &r == queue_[Env::IO_HIGH].front()) || - (!queue_[Env::IO_LOW].empty() && - &r == queue_[Env::IO_LOW].front()))) { - leader_ = &r; - - int64_t delta = next_refill_us_ - NowMicrosMonotonic(); - delta = delta > 0 ? delta : 0; - if (delta == 0) { - timedout = true; + int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonic(); + if (time_until_refill_us > 0) { + if (wait_until_refill_pending_) { + // Somebody is performing (1). Trust we'll be woken up when our request + // is granted or we are needed for future duties. + r.cv.Wait(); } else { - // The leader request thread waits till next_refill_us_ - int64_t wait_until = clock_->NowMicros() + delta; + // Whichever thread reaches here first performs duty (1) as described + // above. + int64_t wait_until = clock_->NowMicros() + time_until_refill_us; RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); ++num_drains_; - timedout = r.cv.TimedWait(wait_until); + wait_until_refill_pending_ = true; + r.cv.TimedWait(wait_until); + TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait", + &time_until_refill_us); + wait_until_refill_pending_ = false; } } else { - r.cv.Wait(); - } - - if (stop_) { - // It is now in the clean-up of ~GenericRateLimiter(). - // Therefore any woken-up request will exit here, - // might or might not has been satiesfied. - --requests_to_wait_; - exit_cv_.Signal(); - return; - } - - // Assertion: request thread running through this point is one of the - // following in terms of the request type and quota granting situation: - // (1) a leader request that is not fully granted with quota and about - // to carry out its leader's work; - // (2) a non-leader request that got fully granted with quota and is - // running to exit; - // (3) a non-leader request that is not fully granted with quota and - // is running to particpate in next-round leader election. - assert((&r == leader_ && !r.granted) || (&r != leader_ && r.granted) || - (&r != leader_ && !r.granted)); - - // Assertion: request thread running through this point is one of the - // following in terms of its position in queue: - // (1) a request got popped off the queue because it is fully granted - // with bytes; - // (2) a request sits at the front of its queue. - assert(r.granted || - (!queue_[Env::IO_HIGH].empty() && - &r == queue_[Env::IO_HIGH].front()) || - (!queue_[Env::IO_LOW].empty() && - &r == queue_[Env::IO_LOW].front())); - - if (leader_ == &r) { - // The leader request thread is now running. - // It might or might not has been TimedWait(). - if (timedout) { - // Time for the leader to do refill and grant bytes to requests - RefillBytesAndGrantRequests(); - - // The leader request retires after refilling and granting bytes - // regardless. This is to simplify the election handling. - leader_ = nullptr; - - if (r.granted) { - // The leader request (that was just retired) - // already got fully granted with quota and will soon exit - - // Assertion: the fully granted leader request is popped off its queue - assert((queue_[Env::IO_HIGH].empty() || - &r != queue_[Env::IO_HIGH].front()) && - (queue_[Env::IO_LOW].empty() || - &r != queue_[Env::IO_LOW].front())); - - // If there is any remaining requests, the leader request (that was - // just retired) makes sure there exists at least one leader candidate - // by signaling a front request of a queue to particpate in - // next-round leader election - if (!queue_[Env::IO_HIGH].empty()) { - queue_[Env::IO_HIGH].front()->cv.Signal(); - } else if (!queue_[Env::IO_LOW].empty()) { - queue_[Env::IO_LOW].front()->cv.Signal(); - } - - // The leader request (that was just retired) exits - break; - } else { - // The leader request (that was just retired) is not fully granted - // with quota. It will particpate in leader election and claim back - // the leader position immediately. - assert(!r.granted); + // Whichever thread reaches here first performs duty (2) as described + // above. + RefillBytesAndGrantRequests(); + if (r.granted) { + // If there is any remaining requests, make sure there exists at least + // one candidate is awake for future duties by signaling a front request + // of a queue. + if (!queue_[Env::IO_HIGH].empty()) { + queue_[Env::IO_HIGH].front()->cv.Signal(); + } else if (!queue_[Env::IO_LOW].empty()) { + queue_[Env::IO_LOW].front()->cv.Signal(); } - } else { - // Spontaneous wake up, need to continue to wait - assert(!r.granted); - leader_ = nullptr; } - } else { - // The non-leader request thread is running. - // It is one of the following request types: - // (1) The request got fully granted with quota and signaled to run to - // exit by the previous leader; - // (2) The request is not fully granted with quota and signaled to run to - // particpate in next-round leader election by the previous leader. - // It might or might not become the next-round leader because a new - // request may come in and acquire the request_mutex_ before this - // request thread does after it was signaled. The new request might - // sit at front of a queue and hence become the next-round leader - // instead. - assert(&r != leader_); } - } while (!r.granted); + // Invariant: non-granted request is always in one queue, and granted + // request is always in zero queues. +#ifndef NDEBUG + int num_found = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + if (std::find(queue_[i].begin(), queue_[i].end(), &r) != + queue_[i].end()) { + ++num_found; + } + } + if (r.granted) { + assert(num_found == 0); + } else { + assert(num_found == 1); + } +#endif // NDEBUG + } while (!stop_ && !r.granted); + + if (stop_) { + // It is now in the clean-up of ~GenericRateLimiter(). + // Therefore any woken-up request will have come out of the loop and then + // exit here. It might or might not have been satisfied. + --requests_to_wait_; + exit_cv_.Signal(); + } } void GenericRateLimiter::RefillBytesAndGrantRequests() { @@ -314,10 +236,8 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() { queue->pop_front(); next_req->granted = true; - if (next_req != leader_) { - // Quota granted, signal the thread to exit - next_req->cv.Signal(); - } + // Quota granted, signal the thread to exit + next_req->cv.Signal(); } } } diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 58342a097b..aa5e6e5e1b 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -101,8 +101,8 @@ class GenericRateLimiter : public RateLimiter { Random rnd_; struct Req; - Req* leader_; std::deque queue_[Env::IO_TOTAL]; + bool wait_until_refill_pending_; bool auto_tuned_; int64_t num_drains_; diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index 04625964c6..a4b1de2f42 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -211,15 +211,13 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(), true /* auto_tuned */)); - // Use callback to advance time because we need to advance (1) after Request() - // has determined the bytes are not available; and (2) before - // RefillBytesAndGrantRequests() computes the next refill time (ensuring - // refill time in the future allows the next request to drain the rate - // limiter). + // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the + // `Env` to advance its time according to the fake wait duration. The + // workaround is to install a callback that advance the `Env`'s mock time. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "GenericRateLimiter::RefillBytesAndGrantRequests", [&](void* /*arg*/) { - special_env.SleepForMicroseconds(static_cast( - std::chrono::microseconds(kTimePerRefill).count())); + "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) { + int64_t time_waited_us = *static_cast(arg); + special_env.SleepForMicroseconds(static_cast(time_waited_us)); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();