rocksdb/db/periodic_task_scheduler.cc
Jay Zhuang d9e71fb2c5 Fix periodic_task unable to re-register the same task type (#10379)
Summary:
Timer has a limitation that it cannot re-register a task with the same name,
because the cancel only mark the task as invalid and wait for the Timer thread
to clean it up later, before the task is cleaned up, the same task name cannot
be added. Which makes the task option update likely to fail, which basically
cancel and re-register the same task name. Change the periodic task name to a
random unique id and store it in periodic_task_scheduler.

Also refactor the `periodic_work` to `periodic_task` to make each job function
as a `task`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10379

Test Plan: unittests

Reviewed By: ajkr

Differential Revision: D38000615

Pulled By: jay-zhuang

fbshipit-source-id: e4135f9422e3b53aaec8eda54f4e18ce633a279e
2022-08-25 18:52:37 -07:00

114 lines
3.8 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/periodic_task_scheduler.h"
#include "rocksdb/system_clock.h"
#ifndef ROCKSDB_LITE
namespace ROCKSDB_NAMESPACE {
// `timer_mutex` is a global mutex serves 3 purposes currently:
// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
// they are currently not implemented in a thread-safe way; and
// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
// the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
// (3) protect tasks_map_ in PeriodicTaskScheduler
// Note: It's not efficient to have a static global mutex, for
// PeriodicTaskScheduler it should be okay, as the operations are called
// infrequently.
static port::Mutex timer_mutex;
static const std::map<PeriodicTaskType, uint64_t> kDefaultPeriodSeconds = {
{PeriodicTaskType::kDumpStats, kInvalidPeriodSec},
{PeriodicTaskType::kPersistStats, kInvalidPeriodSec},
{PeriodicTaskType::kFlushInfoLog, 10},
{PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec},
};
static const std::map<PeriodicTaskType, std::string> kPeriodicTaskTypeNames = {
{PeriodicTaskType::kDumpStats, "dump_st"},
{PeriodicTaskType::kPersistStats, "pst_st"},
{PeriodicTaskType::kFlushInfoLog, "flush_info_log"},
{PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"},
};
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
const PeriodicTaskFunc& fn) {
return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type));
}
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
const PeriodicTaskFunc& fn,
uint64_t repeat_period_seconds) {
MutexLock l(&timer_mutex);
static std::atomic<uint64_t> initial_delay(0);
if (repeat_period_seconds == kInvalidPeriodSec) {
return Status::InvalidArgument("Invalid task repeat period");
}
auto it = tasks_map_.find(task_type);
if (it != tasks_map_.end()) {
// the task already exists and it's the same, no update needed
if (it->second.repeat_every_sec == repeat_period_seconds) {
return Status::OK();
}
// cancel the existing one before register new one
timer_->Cancel(it->second.name);
tasks_map_.erase(it);
}
timer_->Start();
// put task type name as prefix, for easy debug
std::string unique_id =
kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++);
bool succeeded = timer_->Add(
fn, unique_id,
(initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond,
repeat_period_seconds * kMicrosInSecond);
if (!succeeded) {
return Status::Aborted("Failed to register periodic task");
}
auto result = tasks_map_.try_emplace(
task_type, TaskInfo{unique_id, repeat_period_seconds});
if (!result.second) {
return Status::Aborted("Failed to add periodic task");
};
return Status::OK();
}
Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) {
MutexLock l(&timer_mutex);
auto it = tasks_map_.find(task_type);
if (it != tasks_map_.end()) {
timer_->Cancel(it->second.name);
tasks_map_.erase(it);
}
if (!timer_->HasPendingTask()) {
timer_->Shutdown();
}
return Status::OK();
}
Timer* PeriodicTaskScheduler::Default() {
static Timer timer(SystemClock::Default().get());
return &timer;
}
#ifndef NDEBUG
void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) {
static Timer test_timer(clock);
test_timer.TEST_OverrideTimer(clock);
MutexLock l(&timer_mutex);
timer_ = &test_timer;
}
#endif // NDEBUG
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE