mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-02 10:15:54 +00:00
d9e71fb2c5
Summary: Timer has a limitation that it cannot re-register a task with the same name, because the cancel only mark the task as invalid and wait for the Timer thread to clean it up later, before the task is cleaned up, the same task name cannot be added. Which makes the task option update likely to fail, which basically cancel and re-register the same task name. Change the periodic task name to a random unique id and store it in periodic_task_scheduler. Also refactor the `periodic_work` to `periodic_task` to make each job function as a `task`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10379 Test Plan: unittests Reviewed By: ajkr Differential Revision: D38000615 Pulled By: jay-zhuang fbshipit-source-id: e4135f9422e3b53aaec8eda54f4e18ce633a279e
114 lines
3.8 KiB
C++
114 lines
3.8 KiB
C++
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
//
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/periodic_task_scheduler.h"
|
|
|
|
#include "rocksdb/system_clock.h"
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// `timer_mutex` is a global mutex serves 3 purposes currently:
|
|
// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
|
|
// they are currently not implemented in a thread-safe way; and
|
|
// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
|
|
// the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
|
|
// (3) protect tasks_map_ in PeriodicTaskScheduler
|
|
// Note: It's not efficient to have a static global mutex, for
|
|
// PeriodicTaskScheduler it should be okay, as the operations are called
|
|
// infrequently.
|
|
static port::Mutex timer_mutex;
|
|
|
|
static const std::map<PeriodicTaskType, uint64_t> kDefaultPeriodSeconds = {
|
|
{PeriodicTaskType::kDumpStats, kInvalidPeriodSec},
|
|
{PeriodicTaskType::kPersistStats, kInvalidPeriodSec},
|
|
{PeriodicTaskType::kFlushInfoLog, 10},
|
|
{PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec},
|
|
};
|
|
|
|
static const std::map<PeriodicTaskType, std::string> kPeriodicTaskTypeNames = {
|
|
{PeriodicTaskType::kDumpStats, "dump_st"},
|
|
{PeriodicTaskType::kPersistStats, "pst_st"},
|
|
{PeriodicTaskType::kFlushInfoLog, "flush_info_log"},
|
|
{PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"},
|
|
};
|
|
|
|
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
|
|
const PeriodicTaskFunc& fn) {
|
|
return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type));
|
|
}
|
|
|
|
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
|
|
const PeriodicTaskFunc& fn,
|
|
uint64_t repeat_period_seconds) {
|
|
MutexLock l(&timer_mutex);
|
|
static std::atomic<uint64_t> initial_delay(0);
|
|
|
|
if (repeat_period_seconds == kInvalidPeriodSec) {
|
|
return Status::InvalidArgument("Invalid task repeat period");
|
|
}
|
|
auto it = tasks_map_.find(task_type);
|
|
if (it != tasks_map_.end()) {
|
|
// the task already exists and it's the same, no update needed
|
|
if (it->second.repeat_every_sec == repeat_period_seconds) {
|
|
return Status::OK();
|
|
}
|
|
// cancel the existing one before register new one
|
|
timer_->Cancel(it->second.name);
|
|
tasks_map_.erase(it);
|
|
}
|
|
|
|
timer_->Start();
|
|
// put task type name as prefix, for easy debug
|
|
std::string unique_id =
|
|
kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++);
|
|
|
|
bool succeeded = timer_->Add(
|
|
fn, unique_id,
|
|
(initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond,
|
|
repeat_period_seconds * kMicrosInSecond);
|
|
if (!succeeded) {
|
|
return Status::Aborted("Failed to register periodic task");
|
|
}
|
|
auto result = tasks_map_.try_emplace(
|
|
task_type, TaskInfo{unique_id, repeat_period_seconds});
|
|
if (!result.second) {
|
|
return Status::Aborted("Failed to add periodic task");
|
|
};
|
|
return Status::OK();
|
|
}
|
|
|
|
Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) {
|
|
MutexLock l(&timer_mutex);
|
|
auto it = tasks_map_.find(task_type);
|
|
if (it != tasks_map_.end()) {
|
|
timer_->Cancel(it->second.name);
|
|
tasks_map_.erase(it);
|
|
}
|
|
if (!timer_->HasPendingTask()) {
|
|
timer_->Shutdown();
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Timer* PeriodicTaskScheduler::Default() {
|
|
static Timer timer(SystemClock::Default().get());
|
|
return &timer;
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) {
|
|
static Timer test_timer(clock);
|
|
test_timer.TEST_OverrideTimer(clock);
|
|
MutexLock l(&timer_mutex);
|
|
timer_ = &test_timer;
|
|
}
|
|
#endif // NDEBUG
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
#endif // ROCKSDB_LITE
|