mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 04:41:49 +00:00
09957ded1d
Summary: **This PR updates RepeatableThread::wait, breaking some tests on OS X. The rest of the PR fixes the tests on OS X.** `RepeatableThreadTest.MockEnvTest` uses `MockTimeEnv` and `RepeatableThread`. If `RepeatableThread::wait` calls `TimedWait` with a time smaller than or equal to the current (real) time, `TimedWait` returns immediately on certain platforms, e.g. OS X. #4560 addresses this issue by replacing `TimedWait` with `Wait` in test. This fixes the test but makes test/production code diverge, which is not optimal for test coverage. This PR proposes an alternative fix which unifies test and production code path for `RepeatableThread::wait`. We obtain the current (real) time in seconds and add 10 extra seconds to ensure that `RepeatableThread::wait` invokes `TimedWait` with a time greater than (real) current time. This is to prevent the `TimedWait` function from returning immediately without sleeping and releasing the mutex. If `TimedWait` returns immediately, the mutex will not be released, and `RepeatableThread::TEST_WaitForRun` never has a chance to execute the callback which, in this case, updates the result returned by `mock_env->NowMicros()`. Consequently, `RepeatableThread::wait` cannot break out of the loop, causing test to hang. The extra 10 seconds is a best-effort approach because there seems no reliable and deterministic way to provide the aforementioned guarantee. By the time `RepeatableThread::wait` is called, there is no guarantee that the `delay + mock_env->NowMicros()` will be greater than the current real time. However, 10 seconds should be sufficient in most cases. We will keep an eye for possible flakiness of this test. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5107 Differential Revision: D14680885 Pulled By: riversand963 fbshipit-source-id: d1ecbe10e1dacd110bd464cd01e188bfee72b89e
147 lines
3.5 KiB
C++
147 lines
3.5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#pragma once
|
|
|
|
#include <functional>
|
|
#include <string>
|
|
|
|
#include "port/port.h"
|
|
#include "rocksdb/env.h"
|
|
#include "util/mock_time_env.h"
|
|
#include "util/mutexlock.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class RepeatableThread {
|
|
public:
|
|
RepeatableThread(std::function<void()> function,
|
|
const std::string& thread_name, Env* env, uint64_t delay_us,
|
|
uint64_t initial_delay_us = 0)
|
|
: function_(function),
|
|
thread_name_("rocksdb:" + thread_name),
|
|
env_(env),
|
|
delay_us_(delay_us),
|
|
initial_delay_us_(initial_delay_us),
|
|
mutex_(env),
|
|
cond_var_(&mutex_),
|
|
running_(true),
|
|
#ifndef NDEBUG
|
|
waiting_(false),
|
|
run_count_(0),
|
|
#endif
|
|
thread_([this] { thread(); }) {
|
|
}
|
|
|
|
void cancel() {
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
if (!running_) {
|
|
return;
|
|
}
|
|
running_ = false;
|
|
cond_var_.SignalAll();
|
|
}
|
|
thread_.join();
|
|
}
|
|
|
|
bool IsRunning() { return running_; }
|
|
|
|
~RepeatableThread() { cancel(); }
|
|
|
|
#ifndef NDEBUG
|
|
// Wait until RepeatableThread starting waiting, call the optional callback,
|
|
// then wait for one run of RepeatableThread. Tests can use provide a
|
|
// custom env object to mock time, and use the callback here to bump current
|
|
// time and trigger RepeatableThread. See repeatable_thread_test for example.
|
|
//
|
|
// Note: only support one caller of this method.
|
|
void TEST_WaitForRun(std::function<void()> callback = nullptr) {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
while (!waiting_) {
|
|
cond_var_.Wait();
|
|
}
|
|
uint64_t prev_count = run_count_;
|
|
if (callback != nullptr) {
|
|
callback();
|
|
}
|
|
cond_var_.SignalAll();
|
|
while (!(run_count_ > prev_count)) {
|
|
cond_var_.Wait();
|
|
}
|
|
}
|
|
#endif
|
|
|
|
private:
|
|
bool wait(uint64_t delay) {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
if (running_ && delay > 0) {
|
|
uint64_t wait_until = env_->NowMicros() + delay;
|
|
#ifndef NDEBUG
|
|
waiting_ = true;
|
|
cond_var_.SignalAll();
|
|
#endif
|
|
while (running_) {
|
|
cond_var_.TimedWait(wait_until);
|
|
if (env_->NowMicros() >= wait_until) {
|
|
break;
|
|
}
|
|
}
|
|
#ifndef NDEBUG
|
|
waiting_ = false;
|
|
#endif
|
|
}
|
|
return running_;
|
|
}
|
|
|
|
void thread() {
|
|
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
|
|
#if __GLIBC_PREREQ(2, 12)
|
|
// Set thread name.
|
|
auto thread_handle = thread_.native_handle();
|
|
int ret __attribute__((__unused__)) =
|
|
pthread_setname_np(thread_handle, thread_name_.c_str());
|
|
assert(ret == 0);
|
|
#endif
|
|
#endif
|
|
|
|
assert(delay_us_ > 0);
|
|
if (!wait(initial_delay_us_)) {
|
|
return;
|
|
}
|
|
do {
|
|
function_();
|
|
#ifndef NDEBUG
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
run_count_++;
|
|
cond_var_.SignalAll();
|
|
}
|
|
#endif
|
|
} while (wait(delay_us_));
|
|
}
|
|
|
|
const std::function<void()> function_;
|
|
const std::string thread_name_;
|
|
Env* const env_;
|
|
const uint64_t delay_us_;
|
|
const uint64_t initial_delay_us_;
|
|
|
|
// Mutex lock should be held when accessing running_, waiting_
|
|
// and run_count_.
|
|
InstrumentedMutex mutex_;
|
|
InstrumentedCondVar cond_var_;
|
|
bool running_;
|
|
#ifndef NDEBUG
|
|
// RepeatableThread waiting for timeout.
|
|
bool waiting_;
|
|
// Times function_ had run.
|
|
uint64_t run_count_;
|
|
#endif
|
|
port::Thread thread_;
|
|
};
|
|
|
|
} // namespace rocksdb
|