rocksdb/util/timer.h
Jay Zhuang a3acf2ef87 Add seqno to time mapping (#10338)
Summary:
Which will be used for tiered storage to preclude hot data from
compacting to the cold tier (the last level).
Internally, adding seqno to time mapping. A periodic_task is scheduled
to record the current_seqno -> current_time in certain cadence. When
memtable flush, the mapping informaiton is stored in sstable property.
During compaction, the mapping information are merged and get the
approximate time of sequence number, which is used to determine if a key
is recently inserted or not and preclude it from the last level if it's
recently inserted (within the `preclude_last_level_data_seconds`).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10338

Test Plan: CI

Reviewed By: siying

Differential Revision: D37810187

Pulled By: jay-zhuang

fbshipit-source-id: 6953be7a18a99de8b1cb3b162d712f79c2b4899f
2022-07-14 21:49:34 -07:00

347 lines
10 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include <functional>
#include <memory>
#include <queue>
#include <unordered_map>
#include <utility>
#include <vector>
#include "monitoring/instrumented_mutex.h"
#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {
// A Timer class to handle repeated work.
//
// `Start()` and `Shutdown()` are currently not thread-safe. The client must
// serialize calls to these two member functions.
//
// A single timer instance can handle multiple functions via a single thread.
// It is better to leave long running work to a dedicated thread pool.
//
// Timer can be started by calling `Start()`, and ended by calling `Shutdown()`.
// Work (in terms of a `void function`) can be scheduled by calling `Add` with
// a unique function name and de-scheduled by calling `Cancel`.
// Many functions can be added.
//
// Impl Details:
// A heap is used to keep track of when the next timer goes off.
// A map from a function name to the function keeps track of all the functions.
class Timer {
public:
explicit Timer(SystemClock* clock)
: clock_(clock),
mutex_(clock),
cond_var_(&mutex_),
running_(false),
executing_task_(false) {}
~Timer() { Shutdown(); }
// Add a new function to run.
// fn_name has to be identical, otherwise it will fail to add and return false
// start_after_us is the initial delay.
// repeat_every_us is the interval between ending time of the last call and
// starting time of the next call. For example, repeat_every_us = 2000 and
// the function takes 1000us to run. If it starts at time [now]us, then it
// finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
// repeat_every_us == 0 means do not repeat.
bool Add(std::function<void()> fn, const std::string& fn_name,
uint64_t start_after_us, uint64_t repeat_every_us) {
auto fn_info = std::make_unique<FunctionInfo>(std::move(fn), fn_name, 0,
repeat_every_us);
InstrumentedMutexLock l(&mutex_);
// Assign time within mutex to make sure the next_run_time is larger than
// the current running one
fn_info->next_run_time_us = clock_->NowMicros() + start_after_us;
// the new task start time should never before the current task executing
// time, as the executing task can only be running if it's next_run_time_us
// is due (<= clock_->NowMicros()).
if (executing_task_ &&
fn_info->next_run_time_us < heap_.top()->next_run_time_us) {
return false;
}
auto it = map_.find(fn_name);
if (it == map_.end()) {
heap_.push(fn_info.get());
map_.try_emplace(fn_name, std::move(fn_info));
} else {
// timer doesn't support duplicated function name
return false;
}
cond_var_.SignalAll();
return true;
}
void Cancel(const std::string& fn_name) {
InstrumentedMutexLock l(&mutex_);
// Mark the function with fn_name as invalid so that it will not be
// requeued.
auto it = map_.find(fn_name);
if (it != map_.end() && it->second) {
it->second->Cancel();
}
// If the currently running function is fn_name, then we need to wait
// until it finishes before returning to caller.
while (!heap_.empty() && executing_task_) {
FunctionInfo* func_info = heap_.top();
assert(func_info);
if (func_info->name == fn_name) {
WaitForTaskCompleteIfNecessary();
} else {
break;
}
}
}
void CancelAll() {
InstrumentedMutexLock l(&mutex_);
CancelAllWithLock();
}
// Start the Timer
bool Start() {
InstrumentedMutexLock l(&mutex_);
if (running_) {
return false;
}
running_ = true;
thread_ = std::make_unique<port::Thread>(&Timer::Run, this);
return true;
}
// Shutdown the Timer
bool Shutdown() {
{
InstrumentedMutexLock l(&mutex_);
if (!running_) {
return false;
}
running_ = false;
CancelAllWithLock();
cond_var_.SignalAll();
}
if (thread_) {
thread_->join();
}
return true;
}
bool HasPendingTask() const {
InstrumentedMutexLock l(&mutex_);
for (const auto& fn_info : map_) {
if (fn_info.second->IsValid()) {
return true;
}
}
return false;
}
#ifndef NDEBUG
// Wait until Timer starting waiting, call the optional callback, then wait
// for Timer waiting again.
// Tests can provide a custom Clock object to mock time, and use the callback
// here to bump current time and trigger Timer. See timer_test for example.
//
// Note: only support one caller of this method.
void TEST_WaitForRun(const std::function<void()>& callback = nullptr) {
InstrumentedMutexLock l(&mutex_);
// It act as a spin lock
while (executing_task_ ||
(!heap_.empty() &&
heap_.top()->next_run_time_us <= clock_->NowMicros())) {
cond_var_.TimedWait(clock_->NowMicros() + 1000);
}
if (callback != nullptr) {
callback();
}
cond_var_.SignalAll();
do {
cond_var_.TimedWait(clock_->NowMicros() + 1000);
} while (executing_task_ ||
(!heap_.empty() &&
heap_.top()->next_run_time_us <= clock_->NowMicros()));
}
size_t TEST_GetPendingTaskNum() const {
InstrumentedMutexLock l(&mutex_);
size_t ret = 0;
for (const auto& fn_info : map_) {
if (fn_info.second->IsValid()) {
ret++;
}
}
return ret;
}
bool TEST_HasVaildTask(const std::string& func_name) const {
InstrumentedMutexLock l(&mutex_);
auto it = map_.find(func_name);
return it != map_.end() && it->second->IsValid();
}
#endif // NDEBUG
private:
void Run() {
InstrumentedMutexLock l(&mutex_);
while (running_) {
if (heap_.empty()) {
// wait
TEST_SYNC_POINT("Timer::Run::Waiting");
cond_var_.Wait();
continue;
}
FunctionInfo* current_fn = heap_.top();
assert(current_fn);
if (!current_fn->IsValid()) {
heap_.pop();
map_.erase(current_fn->name);
continue;
}
if (current_fn->next_run_time_us <= clock_->NowMicros()) {
// make a copy of the function so it won't be changed after
// mutex_.unlock.
std::function<void()> fn = current_fn->fn;
executing_task_ = true;
mutex_.Unlock();
// Execute the work
fn();
mutex_.Lock();
executing_task_ = false;
cond_var_.SignalAll();
// Remove the work from the heap once it is done executing, make sure
// it's the same function after executing the work while mutex is
// released.
// Note that we are just removing the pointer from the heap. Its
// memory is still managed in the map (as it holds a unique ptr).
// So current_fn is still a valid ptr.
assert(heap_.top() == current_fn);
heap_.pop();
// current_fn may be cancelled already.
if (current_fn->IsValid() && current_fn->repeat_every_us > 0) {
assert(running_);
current_fn->next_run_time_us =
clock_->NowMicros() + current_fn->repeat_every_us;
// Schedule new work into the heap with new time.
heap_.push(current_fn);
} else {
// if current_fn is cancelled or no need to repeat, remove it from the
// map to avoid leak.
map_.erase(current_fn->name);
}
} else {
cond_var_.TimedWait(current_fn->next_run_time_us);
}
}
}
void CancelAllWithLock() {
mutex_.AssertHeld();
if (map_.empty() && heap_.empty()) {
return;
}
// With mutex_ held, set all tasks to invalid so that they will not be
// re-queued.
for (auto& elem : map_) {
auto& func_info = elem.second;
assert(func_info);
func_info->Cancel();
}
// WaitForTaskCompleteIfNecessary() may release mutex_
WaitForTaskCompleteIfNecessary();
while (!heap_.empty()) {
heap_.pop();
}
map_.clear();
}
// A wrapper around std::function to keep track when it should run next
// and at what frequency.
struct FunctionInfo {
// the actual work
std::function<void()> fn;
// name of the function
std::string name;
// when the function should run next
uint64_t next_run_time_us;
// repeat interval
uint64_t repeat_every_us;
// controls whether this function is valid.
// A function is valid upon construction and until someone explicitly
// calls `Cancel()`.
bool valid;
FunctionInfo(std::function<void()>&& _fn, std::string _name,
const uint64_t _next_run_time_us, uint64_t _repeat_every_us)
: fn(std::move(_fn)),
name(std::move(_name)),
next_run_time_us(_next_run_time_us),
repeat_every_us(_repeat_every_us),
valid(true) {}
void Cancel() {
valid = false;
}
bool IsValid() const { return valid; }
};
void WaitForTaskCompleteIfNecessary() {
mutex_.AssertHeld();
while (executing_task_) {
TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting");
cond_var_.Wait();
}
}
struct RunTimeOrder {
bool operator()(const FunctionInfo* f1,
const FunctionInfo* f2) {
return f1->next_run_time_us > f2->next_run_time_us;
}
};
SystemClock* clock_;
// This mutex controls both the heap_ and the map_. It needs to be held for
// making any changes in them.
mutable InstrumentedMutex mutex_;
InstrumentedCondVar cond_var_;
std::unique_ptr<port::Thread> thread_;
bool running_;
bool executing_task_;
std::priority_queue<FunctionInfo*,
std::vector<FunctionInfo*>,
RunTimeOrder> heap_;
// In addition to providing a mapping from a function name to a function,
// it is also responsible for memory management.
std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_;
};
} // namespace ROCKSDB_NAMESPACE