mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-02 01:16:16 +00:00
12f1137355
Summary: Introduces and uses a SystemClock class to RocksDB. This class contains the time-related functions of an Env and these functions can be redirected from the Env to the SystemClock. Many of the places that used an Env (Timer, PerfStepTimer, RepeatableThread, RateLimiter, WriteController) for time-related functions have been changed to use SystemClock instead. There are likely more places that can be changed, but this is a start to show what can/should be done. Over time it would be nice to migrate most (if not all) of the uses of the time functions from the Env to the SystemClock. There are several Env classes that implement these functions. Most of these have not been converted yet to SystemClock implementations; that will come in a subsequent PR. It would be good to unify many of the Mock Timer implementations, so that they behave similarly and be tested similarly (some override Sleep, some use a MockSleep, etc). Additionally, this change will allow new methods to be introduced to the SystemClock (like https://github.com/facebook/rocksdb/issues/7101 WaitFor) in a consistent manner across a smaller number of classes. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7858 Reviewed By: pdillinger Differential Revision: D26006406 Pulled By: mrambacher fbshipit-source-id: ed10a8abbdab7ff2e23d69d85bd25b3e7e899e90
360 lines
11 KiB
C++
360 lines
11 KiB
C++
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#ifndef GFLAGS
|
|
#include <cstdio>
|
|
int main() { fprintf(stderr, "Please install gflags to run tools\n"); }
|
|
#else
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <sstream>
|
|
#include <unordered_map>
|
|
|
|
#include "monitoring/histogram.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/system_clock.h"
|
|
#include "table/block_based/block_builder.h"
|
|
#include "util/gflags_compat.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/stop_watch.h"
|
|
#include "utilities/persistent_cache/block_cache_tier.h"
|
|
#include "utilities/persistent_cache/persistent_cache_tier.h"
|
|
#include "utilities/persistent_cache/volatile_tier_impl.h"
|
|
|
|
DEFINE_int32(nsec, 10, "nsec");
|
|
DEFINE_int32(nthread_write, 1, "Insert threads");
|
|
DEFINE_int32(nthread_read, 1, "Lookup threads");
|
|
DEFINE_string(path, "/tmp/microbench/blkcache", "Path for cachefile");
|
|
DEFINE_string(log_path, "/tmp/log", "Path for the log file");
|
|
DEFINE_uint64(cache_size, std::numeric_limits<uint64_t>::max(), "Cache size");
|
|
DEFINE_int32(iosize, 4 * 1024, "Read IO size");
|
|
DEFINE_int32(writer_iosize, 4 * 1024, "File writer IO size");
|
|
DEFINE_int32(writer_qdepth, 1, "File writer qdepth");
|
|
DEFINE_bool(enable_pipelined_writes, false, "Enable async writes");
|
|
DEFINE_string(cache_type, "block_cache",
|
|
"Cache type. (block_cache, volatile, tiered)");
|
|
DEFINE_bool(benchmark, false, "Benchmark mode");
|
|
DEFINE_int32(volatile_cache_pct, 10, "Percentage of cache in memory tier.");
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
std::unique_ptr<PersistentCacheTier> NewVolatileCache() {
|
|
assert(FLAGS_cache_size != std::numeric_limits<uint64_t>::max());
|
|
std::unique_ptr<PersistentCacheTier> pcache(
|
|
new VolatileCacheTier(FLAGS_cache_size));
|
|
return pcache;
|
|
}
|
|
|
|
std::unique_ptr<PersistentCacheTier> NewBlockCache() {
|
|
std::shared_ptr<Logger> log;
|
|
if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
|
|
fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
|
|
return nullptr;
|
|
}
|
|
|
|
PersistentCacheConfig opt(Env::Default(), FLAGS_path, FLAGS_cache_size, log);
|
|
opt.writer_dispatch_size = FLAGS_writer_iosize;
|
|
opt.writer_qdepth = FLAGS_writer_qdepth;
|
|
opt.pipeline_writes = FLAGS_enable_pipelined_writes;
|
|
opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
|
|
std::unique_ptr<PersistentCacheTier> cache(new BlockCacheTier(opt));
|
|
Status status = cache->Open();
|
|
return cache;
|
|
}
|
|
|
|
// create a new cache tier
|
|
// construct a tiered RAM+Block cache
|
|
std::unique_ptr<PersistentTieredCache> NewTieredCache(
|
|
const size_t mem_size, const PersistentCacheConfig& opt) {
|
|
std::unique_ptr<PersistentTieredCache> tcache(new PersistentTieredCache());
|
|
// create primary tier
|
|
assert(mem_size);
|
|
auto pcache =
|
|
std::shared_ptr<PersistentCacheTier>(new VolatileCacheTier(mem_size));
|
|
tcache->AddTier(pcache);
|
|
// create secondary tier
|
|
auto scache = std::shared_ptr<PersistentCacheTier>(new BlockCacheTier(opt));
|
|
tcache->AddTier(scache);
|
|
|
|
Status s = tcache->Open();
|
|
assert(s.ok());
|
|
return tcache;
|
|
}
|
|
|
|
std::unique_ptr<PersistentTieredCache> NewTieredCache() {
|
|
std::shared_ptr<Logger> log;
|
|
if (!Env::Default()->NewLogger(FLAGS_log_path, &log).ok()) {
|
|
fprintf(stderr, "Error creating log %s \n", FLAGS_log_path.c_str());
|
|
abort();
|
|
}
|
|
|
|
auto pct = FLAGS_volatile_cache_pct / static_cast<double>(100);
|
|
PersistentCacheConfig opt(Env::Default(), FLAGS_path,
|
|
(1 - pct) * FLAGS_cache_size, log);
|
|
opt.writer_dispatch_size = FLAGS_writer_iosize;
|
|
opt.writer_qdepth = FLAGS_writer_qdepth;
|
|
opt.pipeline_writes = FLAGS_enable_pipelined_writes;
|
|
opt.max_write_pipeline_backlog_size = std::numeric_limits<uint64_t>::max();
|
|
return NewTieredCache(FLAGS_cache_size * pct, opt);
|
|
}
|
|
|
|
//
|
|
// Benchmark driver
|
|
//
|
|
class CacheTierBenchmark {
|
|
public:
|
|
explicit CacheTierBenchmark(std::shared_ptr<PersistentCacheTier>&& cache)
|
|
: cache_(cache) {
|
|
if (FLAGS_nthread_read) {
|
|
fprintf(stdout, "Pre-populating\n");
|
|
Prepop();
|
|
fprintf(stdout, "Pre-population completed\n");
|
|
}
|
|
|
|
stats_.Clear();
|
|
|
|
// Start IO threads
|
|
std::list<port::Thread> threads;
|
|
Spawn(FLAGS_nthread_write, &threads,
|
|
std::bind(&CacheTierBenchmark::Write, this));
|
|
Spawn(FLAGS_nthread_read, &threads,
|
|
std::bind(&CacheTierBenchmark::Read, this));
|
|
|
|
// Wait till FLAGS_nsec and then signal to quit
|
|
StopWatchNano t(SystemClock::Default(), /*auto_start=*/true);
|
|
size_t sec = t.ElapsedNanos() / 1000000000ULL;
|
|
while (!quit_) {
|
|
sec = t.ElapsedNanos() / 1000000000ULL;
|
|
quit_ = sec > size_t(FLAGS_nsec);
|
|
/* sleep override */ sleep(1);
|
|
}
|
|
|
|
// Wait for threads to exit
|
|
Join(&threads);
|
|
// Print stats
|
|
PrintStats(sec);
|
|
// Close the cache
|
|
cache_->TEST_Flush();
|
|
cache_->Close();
|
|
}
|
|
|
|
private:
|
|
void PrintStats(const size_t sec) {
|
|
std::ostringstream msg;
|
|
msg << "Test stats" << std::endl
|
|
<< "* Elapsed: " << sec << " s" << std::endl
|
|
<< "* Write Latency:" << std::endl
|
|
<< stats_.write_latency_.ToString() << std::endl
|
|
<< "* Read Latency:" << std::endl
|
|
<< stats_.read_latency_.ToString() << std::endl
|
|
<< "* Bytes written:" << std::endl
|
|
<< stats_.bytes_written_.ToString() << std::endl
|
|
<< "* Bytes read:" << std::endl
|
|
<< stats_.bytes_read_.ToString() << std::endl
|
|
<< "Cache stats:" << std::endl
|
|
<< cache_->PrintStats() << std::endl;
|
|
fprintf(stderr, "%s\n", msg.str().c_str());
|
|
}
|
|
|
|
//
|
|
// Insert implementation and corresponding helper functions
|
|
//
|
|
void Prepop() {
|
|
for (uint64_t i = 0; i < 1024 * 1024; ++i) {
|
|
InsertKey(i);
|
|
insert_key_limit_++;
|
|
read_key_limit_++;
|
|
}
|
|
|
|
// Wait until data is flushed
|
|
cache_->TEST_Flush();
|
|
// warmup the cache
|
|
for (uint64_t i = 0; i < 1024 * 1024; ReadKey(i++)) {
|
|
}
|
|
}
|
|
|
|
void Write() {
|
|
while (!quit_) {
|
|
InsertKey(insert_key_limit_++);
|
|
}
|
|
}
|
|
|
|
void InsertKey(const uint64_t key) {
|
|
// construct key
|
|
uint64_t k[3];
|
|
Slice block_key = FillKey(k, key);
|
|
|
|
// construct value
|
|
auto block = NewBlock(key);
|
|
|
|
// insert
|
|
StopWatchNano timer(SystemClock::Default(), /*auto_start=*/true);
|
|
while (true) {
|
|
Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize);
|
|
if (status.ok()) {
|
|
break;
|
|
}
|
|
|
|
// transient error is possible if we run without pipelining
|
|
assert(!FLAGS_enable_pipelined_writes);
|
|
}
|
|
|
|
// adjust stats
|
|
const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
|
|
stats_.write_latency_.Add(elapsed_micro);
|
|
stats_.bytes_written_.Add(FLAGS_iosize);
|
|
}
|
|
|
|
//
|
|
// Read implementation
|
|
//
|
|
void Read() {
|
|
while (!quit_) {
|
|
ReadKey(random() % read_key_limit_);
|
|
}
|
|
}
|
|
|
|
void ReadKey(const uint64_t val) {
|
|
// construct key
|
|
uint64_t k[3];
|
|
Slice key = FillKey(k, val);
|
|
|
|
// Lookup in cache
|
|
StopWatchNano timer(SystemClock::Default(), /*auto_start=*/true);
|
|
std::unique_ptr<char[]> block;
|
|
size_t size;
|
|
Status status = cache_->Lookup(key, &block, &size);
|
|
if (!status.ok()) {
|
|
fprintf(stderr, "%s\n", status.ToString().c_str());
|
|
}
|
|
assert(status.ok());
|
|
assert(size == (size_t) FLAGS_iosize);
|
|
|
|
// adjust stats
|
|
const size_t elapsed_micro = timer.ElapsedNanos() / 1000;
|
|
stats_.read_latency_.Add(elapsed_micro);
|
|
stats_.bytes_read_.Add(FLAGS_iosize);
|
|
|
|
// verify content
|
|
if (!FLAGS_benchmark) {
|
|
auto expected_block = NewBlock(val);
|
|
assert(memcmp(block.get(), expected_block.get(), FLAGS_iosize) == 0);
|
|
}
|
|
}
|
|
|
|
// create data for a key by filling with a certain pattern
|
|
std::unique_ptr<char[]> NewBlock(const uint64_t val) {
|
|
std::unique_ptr<char[]> data(new char[FLAGS_iosize]);
|
|
memset(data.get(), val % 255, FLAGS_iosize);
|
|
return data;
|
|
}
|
|
|
|
// spawn threads
|
|
void Spawn(const size_t n, std::list<port::Thread>* threads,
|
|
const std::function<void()>& fn) {
|
|
for (size_t i = 0; i < n; ++i) {
|
|
threads->emplace_back(fn);
|
|
}
|
|
}
|
|
|
|
// join threads
|
|
void Join(std::list<port::Thread>* threads) {
|
|
for (auto& th : *threads) {
|
|
th.join();
|
|
}
|
|
}
|
|
|
|
// construct key
|
|
Slice FillKey(uint64_t (&k)[3], const uint64_t val) {
|
|
k[0] = k[1] = 0;
|
|
k[2] = val;
|
|
void* p = static_cast<void*>(&k);
|
|
return Slice(static_cast<char*>(p), sizeof(k));
|
|
}
|
|
|
|
// benchmark stats
|
|
struct Stats {
|
|
void Clear() {
|
|
bytes_written_.Clear();
|
|
bytes_read_.Clear();
|
|
read_latency_.Clear();
|
|
write_latency_.Clear();
|
|
}
|
|
|
|
HistogramImpl bytes_written_;
|
|
HistogramImpl bytes_read_;
|
|
HistogramImpl read_latency_;
|
|
HistogramImpl write_latency_;
|
|
};
|
|
|
|
std::shared_ptr<PersistentCacheTier> cache_; // cache implementation
|
|
std::atomic<uint64_t> insert_key_limit_{0}; // data inserted upto
|
|
std::atomic<uint64_t> read_key_limit_{0}; // data can be read safely upto
|
|
bool quit_ = false; // Quit thread ?
|
|
mutable Stats stats_; // Stats
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
//
|
|
// main
|
|
//
|
|
int main(int argc, char** argv) {
|
|
GFLAGS_NAMESPACE::SetUsageMessage(std::string("\nUSAGE:\n") +
|
|
std::string(argv[0]) + " [OPTIONS]...");
|
|
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, false);
|
|
|
|
std::ostringstream msg;
|
|
msg << "Config" << std::endl
|
|
<< "======" << std::endl
|
|
<< "* nsec=" << FLAGS_nsec << std::endl
|
|
<< "* nthread_write=" << FLAGS_nthread_write << std::endl
|
|
<< "* path=" << FLAGS_path << std::endl
|
|
<< "* cache_size=" << FLAGS_cache_size << std::endl
|
|
<< "* iosize=" << FLAGS_iosize << std::endl
|
|
<< "* writer_iosize=" << FLAGS_writer_iosize << std::endl
|
|
<< "* writer_qdepth=" << FLAGS_writer_qdepth << std::endl
|
|
<< "* enable_pipelined_writes=" << FLAGS_enable_pipelined_writes
|
|
<< std::endl
|
|
<< "* cache_type=" << FLAGS_cache_type << std::endl
|
|
<< "* benchmark=" << FLAGS_benchmark << std::endl
|
|
<< "* volatile_cache_pct=" << FLAGS_volatile_cache_pct << std::endl;
|
|
|
|
fprintf(stderr, "%s\n", msg.str().c_str());
|
|
|
|
std::shared_ptr<ROCKSDB_NAMESPACE::PersistentCacheTier> cache;
|
|
if (FLAGS_cache_type == "block_cache") {
|
|
fprintf(stderr, "Using block cache implementation\n");
|
|
cache = ROCKSDB_NAMESPACE::NewBlockCache();
|
|
} else if (FLAGS_cache_type == "volatile") {
|
|
fprintf(stderr, "Using volatile cache implementation\n");
|
|
cache = ROCKSDB_NAMESPACE::NewVolatileCache();
|
|
} else if (FLAGS_cache_type == "tiered") {
|
|
fprintf(stderr, "Using tiered cache implementation\n");
|
|
cache = ROCKSDB_NAMESPACE::NewTieredCache();
|
|
} else {
|
|
fprintf(stderr, "Unknown option for cache\n");
|
|
}
|
|
|
|
assert(cache);
|
|
if (!cache) {
|
|
fprintf(stderr, "Error creating cache\n");
|
|
abort();
|
|
}
|
|
|
|
std::unique_ptr<ROCKSDB_NAMESPACE::CacheTierBenchmark> benchmark(
|
|
new ROCKSDB_NAMESPACE::CacheTierBenchmark(std::move(cache)));
|
|
|
|
return 0;
|
|
}
|
|
#endif // #ifndef GFLAGS
|
|
#else
|
|
int main(int, char**) { return 0; }
|
|
#endif
|