C++11 concurrency instead of pthread

This commit is contained in:
Matt Clarkson 2014-08-04 11:38:37 +01:00 committed by Dominic Hamon
parent 6b1a6958c4
commit 373cc41100
6 changed files with 64 additions and 114 deletions

View File

@ -139,8 +139,8 @@ BENCHMARK(BM_MultiThreaded)->Threads(4);
#include <functional>
#include <memory>
#include <pthread.h>
#include <string>
#include <thread>
#include <vector>
#include "macros.h"
@ -263,7 +263,7 @@ class State {
// BenchmarkInstance
SharedState* shared_;
pthread_t thread_;
std::thread thread_;
// Custom label set by the user.
std::string label_;

View File

@ -16,7 +16,6 @@
#include "benchmark/macros.h"
#include "colorprint.h"
#include "commandlineflags.h"
#include "mutex_lock.h"
#include "re.h"
#include "sleep.h"
#include "stat.h"
@ -24,14 +23,15 @@
#include "walltime.h"
#include <sys/time.h>
#include <pthread.h>
#include <semaphore.h>
#include <string.h>
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <sstream>
DEFINE_string(benchmark_filter, ".",
@ -184,9 +184,9 @@ inline std::string HumanReadableNumber(double n) {
// For non-dense Range, intermediate values are powers of kRangeMultiplier.
static const int kRangeMultiplier = 8;
static pthread_mutex_t benchmark_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t starting_mutex;
pthread_cond_t starting_cv;
static std::mutex benchmark_mutex;
std::mutex starting_mutex;
std::condition_variable starting_cv;
bool running_benchmark = false;
@ -342,7 +342,7 @@ BenchmarkFamilies::~BenchmarkFamilies() {
}
int BenchmarkFamilies::AddBenchmark(Benchmark* family) {
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
// This loop attempts to reuse an entry that was previously removed to avoid
// unncessary growth of the vector.
for (size_t index = 0; index < families_.size(); ++index) {
@ -357,7 +357,7 @@ int BenchmarkFamilies::AddBenchmark(Benchmark* family) {
}
void BenchmarkFamilies::RemoveBenchmark(int index) {
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
families_[index] = NULL;
// Don't shrink families_ here, we might be called by the destructor of
// BenchmarkFamilies which iterates over the vector.
@ -374,7 +374,7 @@ void BenchmarkFamilies::FindBenchmarks(
return;
}
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
for (internal::Benchmark* family : families_) {
if (family == nullptr) continue; // Family was deleted
@ -564,21 +564,16 @@ class State::FastClock {
explicit FastClock(Type type)
: type_(type),
approx_time_(NowMicros()),
bg_done_(false) {
pthread_cond_init(&bg_cond_, nullptr);
pthread_mutex_init(&bg_mutex_, nullptr);
pthread_create(&bg_, NULL, &BGThreadWrapper, this);
}
bg_done_(false),
bg_(BGThreadWrapper, this) { }
~FastClock() {
{
mutex_lock l(&bg_mutex_);
std::unique_lock<std::mutex> l(bg_mutex_);
bg_done_ = true;
pthread_cond_signal(&bg_cond_);
bg_cond_.notify_one();
}
pthread_join(bg_, NULL);
pthread_mutex_destroy(&bg_mutex_);
pthread_cond_destroy(&bg_cond_);
bg_.join();
}
// Returns true if the current time is guaranteed to be past "when_micros".
@ -605,7 +600,7 @@ class State::FastClock {
// function starts running - see UseRealTime).
void InitType(Type type) {
type_ = type;
mutex_lock l(&bg_mutex_);
std::lock_guard<std::mutex> l(bg_mutex_);
std::atomic_store(&approx_time_, NowMicros());
}
@ -613,10 +608,9 @@ class State::FastClock {
Type type_;
std::atomic<int64_t> approx_time_; // Last time measurement taken by bg_
bool bg_done_; // This is used to signal background thread to exit
pthread_t bg_; // Background thread that updates last_time_ once every ms
pthread_mutex_t bg_mutex_;
pthread_cond_t bg_cond_;
std::mutex bg_mutex_;
std::condition_variable bg_cond_;
std::thread bg_; // Background thread that updates last_time_ once every ms
static void* BGThreadWrapper(void* that) {
((FastClock*)that)->BGThread();
@ -624,23 +618,11 @@ class State::FastClock {
}
void BGThread() {
mutex_lock l(&bg_mutex_);
std::unique_lock<std::mutex> l(bg_mutex_);
while (!bg_done_)
{
struct timeval tv;
gettimeofday(&tv, nullptr);
// Set timeout to 1 ms.
uint32_t const timeout = 1000;
struct timespec ts;
ts.tv_sec = tv.tv_sec + (timeout / kNumMicrosPerSecond);
ts.tv_nsec =
(tv.tv_usec + (timeout % kNumMicrosPerSecond)) * kNumNanosPerMicro;
ts.tv_sec += ts.tv_nsec / kNumNanosPerSecond;
ts.tv_nsec %= kNumNanosPerSecond;
pthread_cond_timedwait(&bg_cond_, &bg_mutex_, &ts);
bg_cond_.wait_for(l, std::chrono::milliseconds(1));
std::atomic_store(&approx_time_, NowMicros());
}
}
@ -693,8 +675,8 @@ struct Benchmark::Instance {
struct State::SharedState {
const internal::Benchmark::Instance* instance;
pthread_mutex_t mu;
pthread_cond_t cond;
std::mutex mu;
std::condition_variable cond;
int starting; // Number of threads that have entered STARTING state
int stopping; // Number of threads that have entered STOPPING state
int exited; // Number of threads that have complete exited
@ -708,15 +690,8 @@ struct State::SharedState {
starting(0),
stopping(0),
exited(0),
threads(b == nullptr ? 1 : b->threads) {
pthread_mutex_init(&mu, nullptr);
pthread_cond_init(&cond, nullptr);
}
threads(b == nullptr ? 1 : b->threads) { }
~SharedState() {
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mu);
}
DISALLOW_COPY_AND_ASSIGN(SharedState)
};
@ -732,7 +707,7 @@ Benchmark::~Benchmark() {
}
Benchmark* Benchmark::Arg(int x) {
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
rangeX_.push_back(x);
return this;
}
@ -741,7 +716,7 @@ Benchmark* Benchmark::Range(int start, int limit) {
std::vector<int> arglist;
AddRange(&arglist, start, limit, kRangeMultiplier);
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
for (size_t i = 0; i < arglist.size(); ++i) rangeX_.push_back(arglist[i]);
return this;
}
@ -749,13 +724,13 @@ Benchmark* Benchmark::Range(int start, int limit) {
Benchmark* Benchmark::DenseRange(int start, int limit) {
CHECK_GE(start, 0);
CHECK_LE(start, limit);
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
for (int arg = start; arg <= limit; ++arg) rangeX_.push_back(arg);
return this;
}
Benchmark* Benchmark::ArgPair(int x, int y) {
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
rangeX_.push_back(x);
rangeY_.push_back(y);
return this;
@ -766,7 +741,7 @@ Benchmark* Benchmark::RangePair(int lo1, int hi1, int lo2, int hi2) {
AddRange(&arglist1, lo1, hi1, kRangeMultiplier);
AddRange(&arglist2, lo2, hi2, kRangeMultiplier);
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
rangeX_.resize(arglist1.size());
std::copy(arglist1.begin(), arglist1.end(), rangeX_.begin());
rangeY_.resize(arglist2.size());
@ -781,7 +756,7 @@ Benchmark* Benchmark::Apply(void (*custom_arguments)(Benchmark* benchmark)) {
Benchmark* Benchmark::Threads(int t) {
CHECK_GT(t, 0);
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
thread_counts_.push_back(t);
return this;
}
@ -790,13 +765,13 @@ Benchmark* Benchmark::ThreadRange(int min_threads, int max_threads) {
CHECK_GT(min_threads, 0);
CHECK_GE(max_threads, min_threads);
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
AddRange(&thread_counts_, min_threads, max_threads, 2);
return this;
}
Benchmark* Benchmark::ThreadPerCpu() {
mutex_lock l(&benchmark_mutex);
std::lock_guard<std::mutex> l(benchmark_mutex);
thread_counts_.push_back(NumCPUs());
return this;
}
@ -1017,13 +992,13 @@ bool State::KeepRunning() {
}
if (!ret && shared_->threads > 1 && thread_index == 0){
mutex_lock l(&shared_->mu);
std::unique_lock<std::mutex> l(shared_->mu);
// Block until all other threads have exited. We can then safely cleanup
// without other threads continuing to access shared variables inside the
// user-provided run function.
while (shared_->exited < shared_->threads - 1) {
pthread_cond_wait(&shared_->cond, &shared_->mu);
shared_->cond.wait(l);
}
}
@ -1045,19 +1020,19 @@ void State::ResumeTiming() {
void State::SetBytesProcessed(int64_t bytes) {
CHECK_EQ(STATE_STOPPED, state_);
mutex_lock l(&shared_->mu);
std::lock_guard<std::mutex> l(shared_->mu);
stats_->bytes_processed = bytes;
}
void State::SetItemsProcessed(int64_t items) {
CHECK_EQ(STATE_STOPPED, state_);
mutex_lock l(&shared_->mu);
std::lock_guard<std::mutex> l(shared_->mu);
stats_->items_processed = items;
}
void State::SetLabel(const std::string& label) {
CHECK_EQ(STATE_STOPPED, state_);
mutex_lock l(&shared_->mu);
std::lock_guard<std::mutex> l(shared_->mu);
shared_->label = label;
}
@ -1083,7 +1058,7 @@ int State::range_y() const {
bool State::StartRunning() {
bool last_thread = false;
{
mutex_lock l(&shared_->mu);
std::lock_guard<std::mutex> l(shared_->mu);
CHECK_EQ(state_, STATE_INITIAL);
state_ = STATE_STARTING;
is_continuation_ = false;
@ -1096,12 +1071,12 @@ bool State::StartRunning() {
clock_->InitType(use_real_time ? FastClock::REAL_TIME
: FastClock::CPU_TIME);
{
mutex_lock l(&starting_mutex);
pthread_cond_broadcast(&starting_cv);
std::lock_guard<std::mutex> l(starting_mutex);
starting_cv.notify_all();
}
} else {
mutex_lock l(&starting_mutex);
pthread_cond_wait(&starting_cv, &starting_mutex);
std::unique_lock<std::mutex> l(starting_mutex);
starting_cv.wait(l);
}
CHECK_EQ(state_, STATE_STARTING);
state_ = STATE_RUNNING;
@ -1161,7 +1136,7 @@ bool State::FinishInterval() {
bool keep_going = false;
{
mutex_lock l(&shared_->mu);
std::lock_guard<std::mutex> l(shared_->mu);
// Either replace the last or add a new data point.
if (is_continuation_)
@ -1198,7 +1173,7 @@ bool State::FinishInterval() {
}
bool State::MaybeStop() {
mutex_lock l(&shared_->mu);
std::lock_guard<std::mutex> l(shared_->mu);
if (shared_->stopping < shared_->threads) {
CHECK_EQ(state_, STATE_STOPPING);
return true;
@ -1211,16 +1186,20 @@ void State::Run() {
stats_->Reset();
shared_->instance->bm->function_(*this);
{
mutex_lock l(&shared_->mu);
std::lock_guard<std::mutex> l(shared_->mu);
shared_->stats.Add(*stats_);
}
}
void State::RunAsThread() {
CHECK_EQ(0, pthread_create(&thread_, nullptr, &State::RunWrapper, this));
thread_ = std::thread(State::RunWrapper, this);
}
void State::Wait() { CHECK_EQ(0, pthread_join(thread_, nullptr)); }
void State::Wait() {
if (thread_.joinable()) {
thread_.join();
}
}
// static
void* State::RunWrapper(void* arg) {
@ -1228,14 +1207,14 @@ void* State::RunWrapper(void* arg) {
CHECK(that != nullptr);
that->Run();
mutex_lock l(&that->shared_->mu);
std::lock_guard<std::mutex> l(that->shared_->mu);
that->shared_->exited++;
if (that->thread_index > 0 &&
that->shared_->exited == that->shared_->threads - 1) {
// All threads but thread 0 have exited the user-provided run function.
// Thread 0 can now wake up and exit.
pthread_cond_signal(&that->shared_->cond);
that->shared_->cond.notify_one();
}
return nullptr;
@ -1301,13 +1280,9 @@ void RunSpecifiedBenchmarks(const BenchmarkReporter* reporter /*= nullptr*/) {
internal::ConsoleReporter default_reporter;
internal::RunMatchingBenchmarks(
spec, reporter == nullptr ? &default_reporter : reporter);
pthread_cond_destroy(&starting_cv);
pthread_mutex_destroy(&starting_mutex);
}
void Initialize(int* argc, const char** argv) {
pthread_mutex_init(&starting_mutex, nullptr);
pthread_cond_init(&starting_cv, nullptr);
walltime::Initialize();
internal::ParseCommandLineFlags(argc, argv);
internal::Benchmark::MeasureOverhead();

View File

@ -1,20 +0,0 @@
#ifndef BENCHMARK_MUTEX_LOCK_H_
#define BENCHMARK_MUTEX_LOCK_H_
#include <pthread.h>
namespace benchmark {
class mutex_lock {
public:
explicit mutex_lock(pthread_mutex_t* mu) : mu_(mu) {
pthread_mutex_lock(mu_);
}
~mutex_lock() { pthread_mutex_unlock(mu_); }
private:
pthread_mutex_t* mu_;
};
} // end namespace benchmark
#endif // BENCHMARK_MUTEX_LOCK_H_

View File

@ -16,7 +16,6 @@
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -28,18 +27,18 @@
#include <iostream>
#include <limits>
#include <mutex>
#include "benchmark/macros.h"
#include "cycleclock.h"
#include "mutex_lock.h"
#include "sleep.h"
namespace benchmark {
namespace {
pthread_once_t cpuinfo_init = PTHREAD_ONCE_INIT;
std::once_flag cpuinfo_init;
double cpuinfo_cycles_per_second = 1.0;
int cpuinfo_num_cpus = 1; // Conservative guess
pthread_mutex_t cputimens_mutex;
std::mutex cputimens_mutex;
#if !defined OS_MACOSX
const int64_t estimate_time_ms = 1000;
@ -76,9 +75,6 @@ bool ReadIntFromFile(const char* file, int* value) {
#endif
void InitializeSystemInfo() {
// TODO: destroy this
pthread_mutex_init(&cputimens_mutex, NULL);
#if defined OS_LINUX || defined OS_CYGWIN
char line[1024];
char* err;
@ -315,7 +311,7 @@ static bool MyCPUUsageCPUTimeNsLocked(double* cputime) {
double MyCPUUsage() {
{
mutex_lock l(&cputimens_mutex);
std::lock_guard<std::mutex> l(cputimens_mutex);
static bool use_cputime_ns = true;
if (use_cputime_ns) {
double value;
@ -344,12 +340,12 @@ double ChildrenCPUUsage() {
#endif // OS_WINDOWS
double CyclesPerSecond(void) {
pthread_once(&cpuinfo_init, &InitializeSystemInfo);
std::call_once(cpuinfo_init, InitializeSystemInfo);
return cpuinfo_cycles_per_second;
}
int NumCPUs(void) {
pthread_once(&cpuinfo_init, &InitializeSystemInfo);
std::call_once(cpuinfo_init, InitializeSystemInfo);
return cpuinfo_num_cpus;
}
} // end namespace benchmark

View File

@ -1,6 +1,7 @@
# Demonstration executable
add_executable(benchmark_test benchmark_test.cc)
target_link_libraries(benchmark_test benchmark ${CMAKE_THREAD_LIBS_INIT})
add_test(benchmark benchmark_test)
# Test harness for regex wrapper
add_executable(re_test ${RE_FILES} "re_test.cc")

View File

@ -7,6 +7,7 @@
#include <limits>
#include <list>
#include <map>
#include <mutex>
#include <set>
#include <sstream>
#include <vector>
@ -34,7 +35,7 @@ std::set<int> ConstructRandomSet(int size) {
return s;
}
pthread_mutex_t test_vector_mu;
std::mutex test_vector_mu;
std::vector<int>* test_vector = nullptr;
} // end namespace
@ -113,23 +114,20 @@ BENCHMARK(BM_StringCompare)->Range(1, 1<<20);
static void BM_SetupTeardown(benchmark::State& state) {
if (state.thread_index == 0) {
pthread_mutex_init(&test_vector_mu, nullptr);
// No need to lock test_vector_mu here as this is running single-threaded.
test_vector = new std::vector<int>();
}
int i = 0;
while (state.KeepRunning()) {
pthread_mutex_lock(&test_vector_mu);
std::lock_guard<std::mutex> l(test_vector_mu);
if (i%2 == 0)
test_vector->push_back(i);
else
test_vector->pop_back();
pthread_mutex_unlock(&test_vector_mu);
++i;
}
if (state.thread_index == 0) {
delete test_vector;
pthread_mutex_destroy(&test_vector_mu);
}
}
BENCHMARK(BM_SetupTeardown)->ThreadPerCpu();