mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-29 09:36:17 +00:00
Fix BackupEngine's internal callers of GenericRateLimiter::Request() not honoring bytes <= GetSingleBurstBytes() (#9063)
Summary: **Context:** Some existing internal calls of `GenericRateLimiter::Request()` in backupable_db.cc and newly added internal calls in https://github.com/facebook/rocksdb/pull/8722/ do not make sure `bytes <= GetSingleBurstBytes()` as required by rate_limiter https://github.com/facebook/rocksdb/blob/master/include/rocksdb/rate_limiter.h#L47. **Impacts of this bug include:** (1) In debug build, when `GenericRateLimiter::Request()` requests bytes greater than `GenericRateLimiter:: kMinRefillBytesPerPeriod = 100` byte, process will crash due to assertion failure. See https://github.com/facebook/rocksdb/pull/9063#discussion_r737034133 and for possible scenario (2) In production build, although there will not be the above crash due to disabled assertion, the bug can lead to a request of small bytes being blocked for a long time by a request of same priority with insanely large bytes from a different thread. See updated https://github.com/facebook/rocksdb/wiki/Rate-Limiter ("Notice that although....the maximum bytes that can be granted in a single request have to be bounded...") for more info. There is an on-going effort to move rate-limiting to file wrapper level so rate limiting in `BackupEngine` and this PR might be made obsolete in the future. **Summary:** - Implemented loop-calling `GenericRateLimiter::Request()` with `bytes <= GetSingleBurstBytes()` as a static private helper function `BackupEngineImpl::LoopRateLimitRequestHelper` -- Considering make this a util function in `RateLimiter` later or do something with `RateLimiter::RequestToken()` - Replaced buggy internal callers with this helper function wherever requested byte is not pre-limited by `GetSingleBurstBytes()` - Removed the minimum refill bytes per period enforced by `GenericRateLimiter` since it is useless and prevents testing `GenericRateLimiter` for extreme case with small refill bytes per period. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9063 Test Plan: - Added a new test that failed the assertion before this change and now passes - It exposed bugs in [the write during creation in `CopyOrCreateFile()`](df7cc66e17/utilities/backupable/backupable_db.cc (L2034-L2043)
), [the read of table properties in `GetFileDbIdentities()`](df7cc66e17/utilities/backupable/backupable_db.cc (L2372-L2378)
), [some read of metadata in `BackupMeta::LoadFromFile()`](df7cc66e17/utilities/backupable/backupable_db.cc (L2726)
) - Passing Existing tests Reviewed By: ajkr Differential Revision: D31824535 Pulled By: hx235 fbshipit-source-id: d2b3dea7a64e2a4b1e6a59fca322f0800a4fcbcc
This commit is contained in:
parent
2035798834
commit
cff7819dff
|
@ -19,6 +19,7 @@
|
|||
### Behavior Changes
|
||||
* `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files.
|
||||
* `TransactionUtil::CheckKeyForConflicts` can also perform conflict-checking based on user-defined timestamps in addition to sequence numbers.
|
||||
* Removed `GenericRateLimiter`'s minimum refill bytes per period previously enforced.
|
||||
|
||||
### Public Interface Change
|
||||
* When options.ttl is used with leveled compaction with compactinon priority kMinOverlappingRatio, files exceeding half of TTL value will be prioritized more, so that by the time TTL is reached, fewer extra compactions will be scheduled to clear them up. At the same time, when compacting files with data older than half of TTL, output files may be cut off based on those files' boundaries, in order for the early TTL compaction to work properly.
|
||||
|
@ -45,6 +46,7 @@
|
|||
* Fixed a bug where stalled writes would remain stalled forever after the user calls `WriteBufferManager::SetBufferSize()` with `new_size == 0` to dynamically disable memory limiting.
|
||||
* Make `DB::close()` thread-safe.
|
||||
* Fix a bug in atomic flush where one bg flush thread will wait forever for a preceding bg flush thread to commit its result to MANIFEST but encounters an error which is mapped to a soft error (DB not stopped).
|
||||
* Fix a bug in `BackupEngine` where some internal callers of `GenericRateLimiter::Request()` do not honor `bytes <= GetSingleBurstBytes()`.
|
||||
|
||||
### New Features
|
||||
* Print information about blob files when using "ldb list_live_files_metadata"
|
||||
|
|
|
@ -302,8 +302,7 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod(
|
|||
// inaccurate but is a number that is large enough.
|
||||
return port::kMaxInt64 / 1000000;
|
||||
} else {
|
||||
return std::max(kMinRefillBytesPerPeriod,
|
||||
rate_bytes_per_sec * refill_period_us_ / 1000000);
|
||||
return rate_bytes_per_sec * refill_period_us_ / 1000000;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -106,8 +106,6 @@ class GenericRateLimiter : public RateLimiter {
|
|||
// This mutex guard all internal states
|
||||
mutable port::Mutex request_mutex_;
|
||||
|
||||
const int64_t kMinRefillBytesPerPeriod = 100;
|
||||
|
||||
const int64_t refill_period_us_;
|
||||
|
||||
int64_t rate_bytes_per_sec_;
|
||||
|
|
|
@ -35,7 +35,9 @@
|
|||
#include "logging/logging.h"
|
||||
#include "monitoring/iostats_context_imp.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/rate_limiter.h"
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "rocksdb/transaction_log.h"
|
||||
#include "table/sst_file_dumper.h"
|
||||
#include "test_util/sync_point.h"
|
||||
|
@ -232,6 +234,12 @@ class BackupEngineImpl {
|
|||
}
|
||||
};
|
||||
|
||||
static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request,
|
||||
RateLimiter* rate_limiter,
|
||||
const Env::IOPriority pri,
|
||||
Statistics* stats,
|
||||
const RateLimiter::OpType op_type);
|
||||
|
||||
static inline std::string WithoutTrailingSlash(const std::string& path) {
|
||||
if (path.empty() || path.back() != '/') {
|
||||
return path;
|
||||
|
@ -2022,9 +2030,16 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
|
|||
checksum_value = crc32c::Extend(checksum_value, data.data(), data.size());
|
||||
}
|
||||
io_s = dest_writer->Append(data);
|
||||
|
||||
if (rate_limiter != nullptr) {
|
||||
if (!src.empty()) {
|
||||
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
|
||||
RateLimiter::OpType::kWrite);
|
||||
} else {
|
||||
LoopRateLimitRequestHelper(data.size(), rate_limiter, Env::IO_LOW,
|
||||
nullptr /* stats */,
|
||||
RateLimiter::OpType::kWrite);
|
||||
}
|
||||
}
|
||||
while (*bytes_toward_next_callback >=
|
||||
options_.callback_trigger_interval_size) {
|
||||
|
@ -2358,8 +2373,9 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
|
|||
// sizeof(*table_properties) is a sufficent but far-from-exact
|
||||
// approximation of read bytes due to metaindex block, std::string
|
||||
// properties and varint compression
|
||||
rate_limiter->Request(sizeof(*table_properties), Env::IO_LOW,
|
||||
nullptr /* stats */, RateLimiter::OpType::kRead);
|
||||
LoopRateLimitRequestHelper(sizeof(*table_properties), rate_limiter,
|
||||
Env::IO_LOW, nullptr /* stats */,
|
||||
RateLimiter::OpType::kRead);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -2388,6 +2404,22 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
|
|||
}
|
||||
}
|
||||
|
||||
void BackupEngineImpl::LoopRateLimitRequestHelper(
|
||||
const size_t total_bytes_to_request, RateLimiter* rate_limiter,
|
||||
const Env::IOPriority pri, Statistics* stats,
|
||||
const RateLimiter::OpType op_type) {
|
||||
assert(rate_limiter != nullptr);
|
||||
size_t remaining_bytes = total_bytes_to_request;
|
||||
size_t request_bytes = 0;
|
||||
while (remaining_bytes > 0) {
|
||||
request_bytes =
|
||||
std::min(static_cast<size_t>(rate_limiter->GetSingleBurstBytes()),
|
||||
remaining_bytes);
|
||||
rate_limiter->Request(request_bytes, pri, stats, op_type);
|
||||
remaining_bytes -= request_bytes;
|
||||
}
|
||||
}
|
||||
|
||||
void BackupEngineImpl::DeleteChildren(const std::string& dir,
|
||||
uint32_t file_type_filter) const {
|
||||
std::vector<std::string> children;
|
||||
|
@ -2716,7 +2748,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
|
|||
std::string line;
|
||||
if (backup_meta_reader->ReadLine(&line)) {
|
||||
if (rate_limiter != nullptr) {
|
||||
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
|
||||
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
|
||||
nullptr /* stats */,
|
||||
RateLimiter::OpType::kRead);
|
||||
}
|
||||
if (StartsWith(line, kSchemaVersionPrefix)) {
|
||||
|
@ -2736,14 +2769,16 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
|
|||
timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
|
||||
} else if (backup_meta_reader->ReadLine(&line)) {
|
||||
if (rate_limiter != nullptr) {
|
||||
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
|
||||
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
|
||||
nullptr /* stats */,
|
||||
RateLimiter::OpType::kRead);
|
||||
}
|
||||
timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
|
||||
}
|
||||
if (backup_meta_reader->ReadLine(&line)) {
|
||||
if (rate_limiter != nullptr) {
|
||||
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
|
||||
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
|
||||
nullptr /* stats */,
|
||||
RateLimiter::OpType::kRead);
|
||||
}
|
||||
sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
|
||||
|
@ -2751,7 +2786,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
|
|||
uint32_t num_files = UINT32_MAX;
|
||||
while (backup_meta_reader->ReadLine(&line)) {
|
||||
if (rate_limiter != nullptr) {
|
||||
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
|
||||
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
|
||||
nullptr /* stats */,
|
||||
RateLimiter::OpType::kRead);
|
||||
}
|
||||
if (line.empty()) {
|
||||
|
@ -2794,7 +2830,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
|
|||
bool footer_present = false;
|
||||
while (backup_meta_reader->ReadLine(&line)) {
|
||||
if (rate_limiter != nullptr) {
|
||||
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
|
||||
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
|
||||
nullptr /* stats */,
|
||||
RateLimiter::OpType::kRead);
|
||||
}
|
||||
std::vector<std::string> components = StringSplit(line, ' ');
|
||||
|
@ -2885,7 +2922,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
|
|||
assert(schema_major_version >= 2);
|
||||
while (backup_meta_reader->ReadLine(&line)) {
|
||||
if (rate_limiter != nullptr) {
|
||||
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
|
||||
LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW,
|
||||
nullptr /* stats */,
|
||||
RateLimiter::OpType::kRead);
|
||||
}
|
||||
if (line.empty()) {
|
||||
|
|
|
@ -13,20 +13,25 @@
|
|||
|
||||
#include <algorithm>
|
||||
#include <array>
|
||||
#include <atomic>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <random>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "db/db_test_util.h"
|
||||
#include "env/env_chroot.h"
|
||||
#include "file/filename.h"
|
||||
#include "port/port.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/file_checksum.h"
|
||||
#include "rocksdb/rate_limiter.h"
|
||||
#include "rocksdb/statistics.h"
|
||||
#include "rocksdb/transaction_log.h"
|
||||
#include "rocksdb/types.h"
|
||||
#include "rocksdb/utilities/options_util.h"
|
||||
|
@ -36,6 +41,7 @@
|
|||
#include "util/cast_util.h"
|
||||
#include "util/mutexlock.h"
|
||||
#include "util/random.h"
|
||||
#include "util/rate_limiter.h"
|
||||
#include "util/stderr_logger.h"
|
||||
#include "util/string_util.h"
|
||||
#include "utilities/backupable/backupable_db_impl.h"
|
||||
|
@ -2803,6 +2809,119 @@ TEST_P(BackupEngineRateLimitingTestWithParam,
|
|||
CloseDBAndBackupEngine();
|
||||
DestroyDB(dbname_, Options());
|
||||
}
|
||||
|
||||
class BackupEngineRateLimitingTestWithParam2
|
||||
: public BackupEngineTest,
|
||||
public testing::WithParamInterface<
|
||||
std::tuple<std::pair<uint64_t, uint64_t> /* limits */>> {
|
||||
public:
|
||||
BackupEngineRateLimitingTestWithParam2() {}
|
||||
};
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
LowRefillBytesPerPeriod, BackupEngineRateLimitingTestWithParam2,
|
||||
::testing::Values(std::make_tuple(std::make_pair(1, 1))));
|
||||
// To verify we don't request over-sized bytes relative to
|
||||
// refill_bytes_per_period_ in each RateLimiter::Request() called in
|
||||
// BackupEngine through verifying we don't trigger assertion
|
||||
// failure on over-sized request in GenericRateLimiter in debug builds
|
||||
TEST_P(BackupEngineRateLimitingTestWithParam2,
|
||||
RateLimitingWithLowRefillBytesPerPeriod) {
|
||||
SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true);
|
||||
|
||||
backupable_options_->max_background_operations = 1;
|
||||
const uint64_t backup_rate_limiter_limit = std::get<0>(GetParam()).first;
|
||||
std::shared_ptr<RateLimiter> backup_rate_limiter(
|
||||
std::make_shared<GenericRateLimiter>(
|
||||
backup_rate_limiter_limit, 1000 * 1000 /* refill_period_us */,
|
||||
10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */,
|
||||
special_env.GetSystemClock(), false /* auto_tuned */));
|
||||
|
||||
backupable_options_->backup_rate_limiter = backup_rate_limiter;
|
||||
|
||||
const uint64_t restore_rate_limiter_limit = std::get<0>(GetParam()).second;
|
||||
std::shared_ptr<RateLimiter> restore_rate_limiter(
|
||||
std::make_shared<GenericRateLimiter>(
|
||||
restore_rate_limiter_limit, 1000 * 1000 /* refill_period_us */,
|
||||
10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */,
|
||||
special_env.GetSystemClock(), false /* auto_tuned */));
|
||||
|
||||
backupable_options_->restore_rate_limiter = restore_rate_limiter;
|
||||
|
||||
// Rate limiter uses `CondVar::TimedWait()`, which does not have access to the
|
||||
// `Env` to advance its time according to the fake wait duration. The
|
||||
// workaround is to install a callback that advance the `Env`'s mock time.
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"GenericRateLimiter::Request:PostTimedWait", [&](void* arg) {
|
||||
int64_t time_waited_us = *static_cast<int64_t*>(arg);
|
||||
special_env.SleepForMicroseconds(static_cast<int>(time_waited_us));
|
||||
});
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
DestroyDB(dbname_, Options());
|
||||
OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
|
||||
kShareWithChecksum /* shared_option */);
|
||||
|
||||
FillDB(db_.get(), 0, 100);
|
||||
int64_t total_bytes_through_before_backup =
|
||||
backupable_options_->backup_rate_limiter->GetTotalBytesThrough();
|
||||
EXPECT_OK(backup_engine_->CreateNewBackup(db_.get(),
|
||||
false /* flush_before_backup */));
|
||||
int64_t total_bytes_through_after_backup =
|
||||
backupable_options_->backup_rate_limiter->GetTotalBytesThrough();
|
||||
ASSERT_GT(total_bytes_through_after_backup,
|
||||
total_bytes_through_before_backup);
|
||||
|
||||
std::vector<BackupInfo> backup_infos;
|
||||
BackupInfo backup_info;
|
||||
backup_engine_->GetBackupInfo(&backup_infos);
|
||||
ASSERT_EQ(1, backup_infos.size());
|
||||
const int backup_id = 1;
|
||||
ASSERT_EQ(backup_id, backup_infos[0].backup_id);
|
||||
ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info,
|
||||
true /* include_file_details */));
|
||||
int64_t total_bytes_through_before_verify_backup =
|
||||
backupable_options_->backup_rate_limiter->GetTotalBytesThrough();
|
||||
EXPECT_OK(
|
||||
backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */));
|
||||
int64_t total_bytes_through_after_verify_backup =
|
||||
backupable_options_->backup_rate_limiter->GetTotalBytesThrough();
|
||||
ASSERT_GT(total_bytes_through_after_verify_backup,
|
||||
total_bytes_through_before_verify_backup);
|
||||
|
||||
CloseDBAndBackupEngine();
|
||||
AssertBackupConsistency(backup_id, 0, 100, 101);
|
||||
|
||||
int64_t total_bytes_through_before_initialize =
|
||||
backupable_options_->backup_rate_limiter->GetTotalBytesThrough();
|
||||
OpenDBAndBackupEngine(false /* destroy_old_data */);
|
||||
// We charge read in BackupEngineImpl::BackupMeta::LoadFromFile,
|
||||
// which is called in BackupEngineImpl::Initialize() during
|
||||
// OpenBackupEngine(false)
|
||||
int64_t total_bytes_through_after_initialize =
|
||||
backupable_options_->backup_rate_limiter->GetTotalBytesThrough();
|
||||
ASSERT_GT(total_bytes_through_after_initialize,
|
||||
total_bytes_through_before_initialize);
|
||||
CloseDBAndBackupEngine();
|
||||
|
||||
DestroyDB(dbname_, Options());
|
||||
OpenBackupEngine(false /* destroy_old_data */);
|
||||
int64_t total_bytes_through_before_restore =
|
||||
backupable_options_->restore_rate_limiter->GetTotalBytesThrough();
|
||||
EXPECT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
|
||||
int64_t total_bytes_through_after_restore =
|
||||
backupable_options_->restore_rate_limiter->GetTotalBytesThrough();
|
||||
ASSERT_GT(total_bytes_through_after_restore,
|
||||
total_bytes_through_before_restore);
|
||||
CloseBackupEngine();
|
||||
|
||||
DestroyDB(dbname_, Options());
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack(
|
||||
"GenericRateLimiter::Request:PostTimedWait");
|
||||
}
|
||||
|
||||
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
|
||||
|
||||
TEST_F(BackupEngineTest, ReadOnlyBackupEngine) {
|
||||
|
|
Loading…
Reference in a new issue