Fix rate limit for MixGraph (#9027)

Summary:
Fix race conditions of the read and write limiters.

Close https://github.com/facebook/rocksdb/issues/8215 .

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9027

Reviewed By: ajkr

Differential Revision: D31645361

Pulled By: zhichao-cao

fbshipit-source-id: 8ea7731991da422eecff2790c1e32db44c751965
This commit is contained in:
Ubuntu 2021-10-14 13:23:01 -07:00 committed by Facebook GitHub Bot
parent e5aa7deae1
commit 140db3c44e

View file

@ -1303,8 +1303,6 @@ DEFINE_double(mix_put_ratio, 0.0,
DEFINE_double(mix_seek_ratio, 0.0,
"The ratio of Seek queries of mix_graph workload");
DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator");
DEFINE_int64(mix_ave_kv_size, 512,
"The average key-value size of this workload");
DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload");
DEFINE_double(
sine_mix_rate_noise, 0.0,
@ -6150,10 +6148,9 @@ class Benchmark {
query.Initiate(ratio);
// the limit of qps initiation
if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) {
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
static_cast<int64_t>(read_rate), 100000 /* refill_period_us */, 10 /* fairness */,
RateLimiter::Mode::kReadsOnly));
if (FLAGS_sine_mix_rate) {
thread->shared->read_rate_limiter.reset(
NewGenericRateLimiter(static_cast<int64_t>(read_rate)));
thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
}
@ -6201,23 +6198,25 @@ class Benchmark {
usecs_since_last = 0;
}
if (usecs_since_last >
(FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
if (FLAGS_sine_mix_rate &&
usecs_since_last >
(FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) {
double usecs_since_start =
static_cast<double>(now - thread->stats.GetStart());
thread->stats.ResetSineInterval();
double mix_rate_with_noise = AddNoise(
SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise);
read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]);
write_rate =
mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size;
write_rate = mix_rate_with_noise * query.ratio_[1];
thread->shared->write_rate_limiter.reset(
NewGenericRateLimiter(static_cast<int64_t>(write_rate)));
thread->shared->read_rate_limiter.reset(NewGenericRateLimiter(
static_cast<int64_t>(read_rate),
FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10,
RateLimiter::Mode::kReadsOnly));
if (read_rate > 0) {
thread->shared->read_rate_limiter->SetBytesPerSecond(
static_cast<int64_t>(read_rate));
}
if (write_rate > 0) {
thread->shared->write_rate_limiter->SetBytesPerSecond(
static_cast<int64_t>(write_rate));
}
}
// Start the query
if (query_type == 0) {
@ -6242,11 +6241,9 @@ class Benchmark {
abort();
}
if (thread->shared->read_rate_limiter.get() != nullptr &&
read % 256 == 255) {
thread->shared->read_rate_limiter->Request(
256, Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kRead);
if (thread->shared->read_rate_limiter && read % 100 == 0) {
thread->shared->read_rate_limiter->Request(100, Env::IO_HIGH,
nullptr /*stats*/);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kRead);
} else if (query_type == 1) {
@ -6267,10 +6264,9 @@ class Benchmark {
ErrorExit();
}
if (thread->shared->write_rate_limiter) {
thread->shared->write_rate_limiter->Request(
key.size() + val_size, Env::IO_HIGH, nullptr /*stats*/,
RateLimiter::OpType::kWrite);
if (thread->shared->write_rate_limiter && puts % 100 == 0) {
thread->shared->write_rate_limiter->Request(100, Env::IO_HIGH,
nullptr /*stats*/);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
} else if (query_type == 2) {