mirror of https://github.com/facebook/rocksdb.git
Don't hold DB mutex for block cache entry stat scans (#8538)
Summary: I previously didn't notice the DB mutex was being held during block cache entry stat scans, probably because I primarily checked for read performance regressions, because they require the block cache and are traditionally latency-sensitive. This change does some refactoring to avoid holding DB mutex and to avoid triggering and waiting for a scan in GetProperty("rocksdb.cfstats"). Some tests have to be updated because now the stats collector is populated in the Cache aggressively on DB startup rather than lazily. (I hope to clean up some of this added complexity in the future.) This change also ensures proper treatment of need_out_of_mutex for non-int DB properties. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8538 Test Plan: Added unit test logic that uses sync points to fail if the DB mutex is held during a scan, covering the various ways that a scan might be triggered. Performance test - the known impact to holding the DB mutex is on TransactionDB, and the easiest way to see the impact is to hack the scan code to almost always miss and take an artificially long time scanning. Here I've injected an unconditional 5s sleep at the call to ApplyToAllEntries. Before (hacked): $ TEST_TMPDIR=/dev/shm ./db_bench.base_xxx -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op' randomtransaction : 433.219 micros/op 2308 ops/sec; 0.1 MB/s ( transactions:78999 aborts:0) rocksdb.db.write.micros P50 : 16.135883 P95 : 36.622503 P99 : 66.036115 P100 : 5000614.000000 COUNT : 149677 SUM : 8364856 $ TEST_TMPDIR=/dev/shm ./db_bench.base_xxx -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op' randomtransaction : 448.802 micros/op 2228 ops/sec; 0.1 MB/s ( transactions:75999 aborts:0) rocksdb.db.write.micros P50 : 16.629221 P95 : 37.320607 P99 : 72.144341 P100 : 5000871.000000 COUNT : 143995 SUM : 13472323 Notice the 5s P100 write time. After (hacked): $ TEST_TMPDIR=/dev/shm ./db_bench.new_xxx -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op' randomtransaction : 303.645 micros/op 3293 ops/sec; 0.1 MB/s ( transactions:98999 aborts:0) rocksdb.db.write.micros P50 : 16.061871 P95 : 33.978834 P99 : 60.018017 P100 : 616315.000000 COUNT : 187619 SUM : 4097407 $ TEST_TMPDIR=/dev/shm ./db_bench.new_xxx -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op' randomtransaction : 310.383 micros/op 3221 ops/sec; 0.1 MB/s ( transactions:96999 aborts:0) rocksdb.db.write.micros P50 : 16.270026 P95 : 35.786844 P99 : 64.302878 P100 : 603088.000000 COUNT : 183819 SUM : 4095918 P100 write is now ~0.6s. Not good, but it's the same even if I completely bypass all the scanning code: $ TEST_TMPDIR=/dev/shm ./db_bench.new_skip -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op' randomtransaction : 311.365 micros/op 3211 ops/sec; 0.1 MB/s ( transactions:96999 aborts:0) rocksdb.db.write.micros P50 : 16.274362 P95 : 36.221184 P99 : 68.809783 P100 : 649808.000000 COUNT : 183819 SUM : 4156767 $ TEST_TMPDIR=/dev/shm ./db_bench.new_skip -benchmarks=randomtransaction,stats -cache_index_and_filter_blocks=1 -bloom_bits=10 -partition_index_and_filters=1 -duration=30 -stats_dump_period_sec=12 -cache_size=100000000 -statistics -transaction_db 2>&1 | egrep 'db.db.write.micros|micros/op' randomtransaction : 308.395 micros/op 3242 ops/sec; 0.1 MB/s ( transactions:97999 aborts:0) rocksdb.db.write.micros P50 : 16.106222 P95 : 37.202403 P99 : 67.081875 P100 : 598091.000000 COUNT : 185714 SUM : 4098832 No substantial difference. Reviewed By: siying Differential Revision: D29738847 Pulled By: pdillinger fbshipit-source-id: 1c5c155f5a1b62e4fea0fd4eeb515a8b7474027b
This commit is contained in:
parent
1e5b631e51
commit
df5dc73bec
|
@ -51,7 +51,8 @@ namespace ROCKSDB_NAMESPACE {
|
|||
template <class Stats>
|
||||
class CacheEntryStatsCollector {
|
||||
public:
|
||||
// Gathers stats and saves results into `stats`
|
||||
// Gather and save stats if saved stats are too old. (Use GetStats() to
|
||||
// read saved stats.)
|
||||
//
|
||||
// Maximum allowed age for a "hit" on saved results is determined by the
|
||||
// two interval parameters. Both set to 0 forces a re-scan. For example
|
||||
|
@ -61,10 +62,9 @@ class CacheEntryStatsCollector {
|
|||
// Justification: scans can vary wildly in duration, e.g. from 0.02 sec
|
||||
// to as much as 20 seconds, so we want to be able to cap the absolute
|
||||
// and relative frequency of scans.
|
||||
void GetStats(Stats *stats, int min_interval_seconds,
|
||||
int min_interval_factor) {
|
||||
void CollectStats(int min_interval_seconds, int min_interval_factor) {
|
||||
// Waits for any pending reader or writer (collector)
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
std::lock_guard<std::mutex> lock(working_mutex_);
|
||||
|
||||
uint64_t max_age_micros =
|
||||
static_cast<uint64_t>(std::max(min_interval_seconds, 0)) * 1000000U;
|
||||
|
@ -79,19 +79,28 @@ class CacheEntryStatsCollector {
|
|||
uint64_t start_time_micros = clock_->NowMicros();
|
||||
if ((start_time_micros - last_end_time_micros_) > max_age_micros) {
|
||||
last_start_time_micros_ = start_time_micros;
|
||||
saved_stats_.BeginCollection(cache_, clock_, start_time_micros);
|
||||
working_stats_.BeginCollection(cache_, clock_, start_time_micros);
|
||||
|
||||
cache_->ApplyToAllEntries(saved_stats_.GetEntryCallback(), {});
|
||||
cache_->ApplyToAllEntries(working_stats_.GetEntryCallback(), {});
|
||||
TEST_SYNC_POINT_CALLBACK(
|
||||
"CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries", nullptr);
|
||||
|
||||
uint64_t end_time_micros = clock_->NowMicros();
|
||||
last_end_time_micros_ = end_time_micros;
|
||||
saved_stats_.EndCollection(cache_, clock_, end_time_micros);
|
||||
working_stats_.EndCollection(cache_, clock_, end_time_micros);
|
||||
} else {
|
||||
saved_stats_.SkippedCollection();
|
||||
working_stats_.SkippedCollection();
|
||||
}
|
||||
// Copy to caller
|
||||
|
||||
// Save so that we don't need to wait for an outstanding collection in
|
||||
// order to make of copy of the last saved stats
|
||||
std::lock_guard<std::mutex> lock2(saved_mutex_);
|
||||
saved_stats_ = working_stats_;
|
||||
}
|
||||
|
||||
// Gets saved stats, regardless of age
|
||||
void GetStats(Stats *stats) {
|
||||
std::lock_guard<std::mutex> lock(saved_mutex_);
|
||||
*stats = saved_stats_;
|
||||
}
|
||||
|
||||
|
@ -129,6 +138,7 @@ class CacheEntryStatsCollector {
|
|||
Cache::Priority::HIGH);
|
||||
if (!s.ok()) {
|
||||
assert(h == nullptr);
|
||||
delete new_ptr;
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
@ -145,6 +155,7 @@ class CacheEntryStatsCollector {
|
|||
private:
|
||||
explicit CacheEntryStatsCollector(Cache *cache, SystemClock *clock)
|
||||
: saved_stats_(),
|
||||
working_stats_(),
|
||||
last_start_time_micros_(0),
|
||||
last_end_time_micros_(/*pessimistic*/ 10000000),
|
||||
cache_(cache),
|
||||
|
@ -154,10 +165,14 @@ class CacheEntryStatsCollector {
|
|||
delete static_cast<CacheEntryStatsCollector *>(value);
|
||||
}
|
||||
|
||||
std::mutex mutex_;
|
||||
std::mutex saved_mutex_;
|
||||
Stats saved_stats_;
|
||||
|
||||
std::mutex working_mutex_;
|
||||
Stats working_stats_;
|
||||
uint64_t last_start_time_micros_;
|
||||
uint64_t last_end_time_micros_;
|
||||
|
||||
Cache *const cache_;
|
||||
SystemClock *const clock_;
|
||||
};
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#include <string>
|
||||
|
||||
#include "cache/sharded_cache.h"
|
||||
#include "port/lang.h"
|
||||
#include "port/malloc.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/secondary_cache.h"
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
|
||||
#include "cache/cache_entry_roles.h"
|
||||
#include "cache/lru_cache.h"
|
||||
#include "db/column_family.h"
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/table.h"
|
||||
|
@ -152,13 +153,15 @@ class DBBlockCacheTest : public DBTestBase {
|
|||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
const std::array<size_t, kNumCacheEntryRoles>& GetCacheEntryRoleCountsBg() {
|
||||
const std::array<size_t, kNumCacheEntryRoles> GetCacheEntryRoleCountsBg() {
|
||||
// Verify in cache entry role stats
|
||||
ColumnFamilyHandleImpl* cfh =
|
||||
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
|
||||
InternalStats* internal_stats_ptr = cfh->cfd()->internal_stats();
|
||||
return internal_stats_ptr->TEST_GetCacheEntryRoleStats(/*foreground=*/false)
|
||||
.entry_counts;
|
||||
InternalStats::CacheEntryRoleStats stats;
|
||||
internal_stats_ptr->TEST_GetCacheEntryRoleStats(&stats,
|
||||
/*foreground=*/false);
|
||||
return stats.entry_counts;
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
};
|
||||
|
@ -170,7 +173,13 @@ TEST_F(DBBlockCacheTest, IteratorBlockCacheUsage) {
|
|||
auto options = GetOptions(table_options);
|
||||
InitTable(options);
|
||||
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
|
||||
LRUCacheOptions co;
|
||||
co.capacity = 0;
|
||||
co.num_shard_bits = 0;
|
||||
co.strict_capacity_limit = false;
|
||||
// Needed not to count entry stats collector
|
||||
co.metadata_charge_policy = kDontChargeCacheMetadata;
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(co);
|
||||
table_options.block_cache = cache;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
|
@ -194,7 +203,13 @@ TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) {
|
|||
auto options = GetOptions(table_options);
|
||||
InitTable(options);
|
||||
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
|
||||
LRUCacheOptions co;
|
||||
co.capacity = 0;
|
||||
co.num_shard_bits = 0;
|
||||
co.strict_capacity_limit = false;
|
||||
// Needed not to count entry stats collector
|
||||
co.metadata_charge_policy = kDontChargeCacheMetadata;
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(co);
|
||||
table_options.block_cache = cache;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
|
@ -265,7 +280,13 @@ TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
|
|||
|
||||
ReadOptions read_options;
|
||||
std::shared_ptr<Cache> compressed_cache = NewLRUCache(1 << 25, 0, false);
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
|
||||
LRUCacheOptions co;
|
||||
co.capacity = 0;
|
||||
co.num_shard_bits = 0;
|
||||
co.strict_capacity_limit = false;
|
||||
// Needed not to count entry stats collector
|
||||
co.metadata_charge_policy = kDontChargeCacheMetadata;
|
||||
std::shared_ptr<Cache> cache = NewLRUCache(co);
|
||||
table_options.block_cache = cache;
|
||||
table_options.no_block_cache = false;
|
||||
table_options.block_cache_compressed = compressed_cache;
|
||||
|
@ -944,10 +965,15 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) {
|
|||
}
|
||||
|
||||
static void ClearCache(Cache* cache) {
|
||||
auto roles = CopyCacheDeleterRoleMap();
|
||||
std::deque<std::string> keys;
|
||||
Cache::ApplyToAllEntriesOptions opts;
|
||||
auto callback = [&](const Slice& key, void* /*value*/, size_t /*charge*/,
|
||||
Cache::DeleterFn /*deleter*/) {
|
||||
Cache::DeleterFn deleter) {
|
||||
if (roles.find(deleter) == roles.end()) {
|
||||
// Keep the stats collector
|
||||
return;
|
||||
}
|
||||
keys.push_back(key.ToString());
|
||||
};
|
||||
cache->ApplyToAllEntries(callback, opts);
|
||||
|
@ -1126,6 +1152,9 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
|
|||
&h, Cache::Priority::HIGH));
|
||||
ASSERT_GT(cache->GetUsage(), cache->GetCapacity());
|
||||
expected = {};
|
||||
// For CacheEntryStatsCollector
|
||||
expected[static_cast<size_t>(CacheEntryRole::kMisc)] = 1;
|
||||
// For Fill-it-up
|
||||
expected[static_cast<size_t>(CacheEntryRole::kMisc)]++;
|
||||
// Still able to hit on saved stats
|
||||
EXPECT_EQ(prev_expected, GetCacheEntryRoleCountsBg());
|
||||
|
@ -1134,6 +1163,48 @@ TEST_F(DBBlockCacheTest, CacheEntryRoleStats) {
|
|||
EXPECT_EQ(expected, GetCacheEntryRoleCountsBg());
|
||||
|
||||
cache->Release(h);
|
||||
|
||||
// Now we test that the DB mutex is not held during scans, for the ways
|
||||
// we know how to (possibly) trigger them. Without a better good way to
|
||||
// check this, we simply inject an acquire & release of the DB mutex
|
||||
// deep in the stat collection code. If we were already holding the
|
||||
// mutex, that is UB that would at least be found by TSAN.
|
||||
int scan_count = 0;
|
||||
SyncPoint::GetInstance()->SetCallBack(
|
||||
"CacheEntryStatsCollector::GetStats:AfterApplyToAllEntries",
|
||||
[this, &scan_count](void*) {
|
||||
dbfull()->TEST_LockMutex();
|
||||
dbfull()->TEST_UnlockMutex();
|
||||
++scan_count;
|
||||
});
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
// Different things that might trigger a scan, with mock sleeps to
|
||||
// force a miss.
|
||||
env_->MockSleepForSeconds(10000);
|
||||
dbfull()->DumpStats();
|
||||
ASSERT_EQ(scan_count, 1);
|
||||
|
||||
env_->MockSleepForSeconds(10000);
|
||||
ASSERT_TRUE(
|
||||
db_->GetMapProperty(DB::Properties::kBlockCacheEntryStats, &values));
|
||||
ASSERT_EQ(scan_count, 2);
|
||||
|
||||
env_->MockSleepForSeconds(10000);
|
||||
std::string value_str;
|
||||
ASSERT_TRUE(
|
||||
db_->GetProperty(DB::Properties::kBlockCacheEntryStats, &value_str));
|
||||
ASSERT_EQ(scan_count, 3);
|
||||
|
||||
env_->MockSleepForSeconds(10000);
|
||||
ASSERT_TRUE(db_->GetProperty(DB::Properties::kCFStats, &value_str));
|
||||
// To match historical speed, querying this property no longer triggers
|
||||
// a scan, even if results are old. But periodic dump stats should keep
|
||||
// things reasonably updated.
|
||||
ASSERT_EQ(scan_count, /*unchanged*/ 3);
|
||||
|
||||
SyncPoint::GetInstance()->DisableProcessing();
|
||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||
}
|
||||
EXPECT_GE(iterations_tested, 1);
|
||||
}
|
||||
|
|
|
@ -63,6 +63,7 @@
|
|||
#include "memtable/hash_linklist_rep.h"
|
||||
#include "memtable/hash_skiplist_rep.h"
|
||||
#include "monitoring/in_memory_stats_history.h"
|
||||
#include "monitoring/instrumented_mutex.h"
|
||||
#include "monitoring/iostats_context_imp.h"
|
||||
#include "monitoring/perf_context_imp.h"
|
||||
#include "monitoring/persistent_stats_history.h"
|
||||
|
@ -944,18 +945,31 @@ void DBImpl::DumpStats() {
|
|||
if (shutdown_initiated_) {
|
||||
return;
|
||||
}
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::DumpStats:StartRunning");
|
||||
{
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->initialized()) {
|
||||
// Release DB mutex for gathering cache entry stats. Pass over all
|
||||
// column families for this first so that other stats are dumped
|
||||
// near-atomically.
|
||||
InstrumentedMutexUnlock u(&mutex_);
|
||||
cfd->internal_stats()->CollectCacheEntryStats(/*foreground=*/false);
|
||||
}
|
||||
}
|
||||
|
||||
const std::string* property = &DB::Properties::kDBStats;
|
||||
const DBPropertyInfo* property_info = GetPropertyInfo(*property);
|
||||
assert(property_info != nullptr);
|
||||
assert(!property_info->need_out_of_mutex);
|
||||
default_cf_internal_stats_->GetStringProperty(*property_info, *property,
|
||||
&stats);
|
||||
|
||||
property = &DB::Properties::kCFStatsNoFileHistogram;
|
||||
property_info = GetPropertyInfo(*property);
|
||||
assert(property_info != nullptr);
|
||||
assert(!property_info->need_out_of_mutex);
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->initialized()) {
|
||||
cfd->internal_stats()->GetStringProperty(*property_info, *property,
|
||||
|
@ -966,6 +980,7 @@ void DBImpl::DumpStats() {
|
|||
property = &DB::Properties::kCFFileHistogram;
|
||||
property_info = GetPropertyInfo(*property);
|
||||
assert(property_info != nullptr);
|
||||
assert(!property_info->need_out_of_mutex);
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->initialized()) {
|
||||
cfd->internal_stats()->GetStringProperty(*property_info, *property,
|
||||
|
@ -3264,16 +3279,21 @@ bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
|
|||
}
|
||||
return ret_value;
|
||||
} else if (property_info->handle_string) {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return cfd->internal_stats()->GetStringProperty(*property_info, property,
|
||||
value);
|
||||
} else if (property_info->handle_string_dbimpl) {
|
||||
std::string tmp_value;
|
||||
bool ret_value = (this->*(property_info->handle_string_dbimpl))(&tmp_value);
|
||||
if (ret_value) {
|
||||
*value = tmp_value;
|
||||
if (property_info->need_out_of_mutex) {
|
||||
return cfd->internal_stats()->GetStringProperty(*property_info, property,
|
||||
value);
|
||||
} else {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return cfd->internal_stats()->GetStringProperty(*property_info, property,
|
||||
value);
|
||||
}
|
||||
} else if (property_info->handle_string_dbimpl) {
|
||||
if (property_info->need_out_of_mutex) {
|
||||
return (this->*(property_info->handle_string_dbimpl))(value);
|
||||
} else {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return (this->*(property_info->handle_string_dbimpl))(value);
|
||||
}
|
||||
return ret_value;
|
||||
}
|
||||
// Shouldn't reach here since exactly one of handle_string and handle_int
|
||||
// should be non-nullptr.
|
||||
|
@ -3291,9 +3311,14 @@ bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
|
|||
if (property_info == nullptr) {
|
||||
return false;
|
||||
} else if (property_info->handle_map) {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return cfd->internal_stats()->GetMapProperty(*property_info, property,
|
||||
value);
|
||||
if (property_info->need_out_of_mutex) {
|
||||
return cfd->internal_stats()->GetMapProperty(*property_info, property,
|
||||
value);
|
||||
} else {
|
||||
InstrumentedMutexLock l(&mutex_);
|
||||
return cfd->internal_stats()->GetMapProperty(*property_info, property,
|
||||
value);
|
||||
}
|
||||
}
|
||||
// If we reach this point it means that handle_map is not provided for the
|
||||
// requested property
|
||||
|
|
|
@ -394,7 +394,7 @@ const std::unordered_map<std::string, DBPropertyInfo>
|
|||
{DB::Properties::kDBStats,
|
||||
{false, &InternalStats::HandleDBStats, nullptr, nullptr, nullptr}},
|
||||
{DB::Properties::kBlockCacheEntryStats,
|
||||
{false, &InternalStats::HandleBlockCacheEntryStats, nullptr,
|
||||
{true, &InternalStats::HandleBlockCacheEntryStats, nullptr,
|
||||
&InternalStats::HandleBlockCacheEntryStatsMap, nullptr}},
|
||||
{DB::Properties::kSSTables,
|
||||
{false, &InternalStats::HandleSsTables, nullptr, nullptr, nullptr}},
|
||||
|
@ -510,7 +510,7 @@ const std::unordered_map<std::string, DBPropertyInfo>
|
|||
{false, nullptr, &InternalStats::HandleBlockCachePinnedUsage, nullptr,
|
||||
nullptr}},
|
||||
{DB::Properties::kOptionsStatistics,
|
||||
{false, nullptr, nullptr, nullptr,
|
||||
{true, nullptr, nullptr, nullptr,
|
||||
&DBImpl::GetPropertyHandleOptionsStatistics}},
|
||||
};
|
||||
|
||||
|
@ -526,29 +526,41 @@ InternalStats::InternalStats(int num_levels, SystemClock* clock,
|
|||
number_levels_(num_levels),
|
||||
clock_(clock),
|
||||
cfd_(cfd),
|
||||
started_at_(clock->NowMicros()) {}
|
||||
|
||||
Status InternalStats::CollectCacheEntryStats(bool foreground) {
|
||||
// Lazy initialize/reference the collector. It is pinned in cache (through
|
||||
// a shared_ptr) so that it does not get immediately ejected from a full
|
||||
// cache, which would force a re-scan on the next GetStats.
|
||||
if (!cache_entry_stats_collector_) {
|
||||
Cache* block_cache;
|
||||
bool ok = HandleBlockCacheStat(&block_cache);
|
||||
if (ok) {
|
||||
// Extract or create stats collector.
|
||||
Status s = CacheEntryStatsCollector<CacheEntryRoleStats>::GetShared(
|
||||
block_cache, clock_, &cache_entry_stats_collector_);
|
||||
if (!s.ok()) {
|
||||
// Block cache likely under pressure. Scanning could make it worse,
|
||||
// so skip.
|
||||
return s;
|
||||
}
|
||||
started_at_(clock->NowMicros()) {
|
||||
Cache* block_cache = nullptr;
|
||||
bool ok = GetBlockCacheForStats(&block_cache);
|
||||
if (ok) {
|
||||
assert(block_cache);
|
||||
// Extract or create stats collector. Could fail in rare cases.
|
||||
Status s = CacheEntryStatsCollector<CacheEntryRoleStats>::GetShared(
|
||||
block_cache, clock_, &cache_entry_stats_collector_);
|
||||
if (s.ok()) {
|
||||
assert(cache_entry_stats_collector_);
|
||||
} else {
|
||||
return Status::NotFound("block cache not configured");
|
||||
assert(!cache_entry_stats_collector_);
|
||||
}
|
||||
} else {
|
||||
assert(!block_cache);
|
||||
}
|
||||
}
|
||||
|
||||
void InternalStats::TEST_GetCacheEntryRoleStats(CacheEntryRoleStats* stats,
|
||||
bool foreground) {
|
||||
CollectCacheEntryStats(foreground);
|
||||
if (cache_entry_stats_collector_) {
|
||||
cache_entry_stats_collector_->GetStats(stats);
|
||||
}
|
||||
}
|
||||
|
||||
void InternalStats::CollectCacheEntryStats(bool foreground) {
|
||||
// This function is safe to call from any thread because
|
||||
// cache_entry_stats_collector_ field is const after constructor
|
||||
// and ->GetStats does its own synchronization, which also suffices for
|
||||
// cache_entry_stats_.
|
||||
|
||||
if (!cache_entry_stats_collector_) {
|
||||
return; // nothing to do (e.g. no block cache)
|
||||
}
|
||||
assert(cache_entry_stats_collector_);
|
||||
|
||||
// For "background" collections, strictly cap the collection time by
|
||||
// expanding effective cache TTL. For foreground, be more aggressive about
|
||||
|
@ -556,9 +568,8 @@ Status InternalStats::CollectCacheEntryStats(bool foreground) {
|
|||
int min_interval_seconds = foreground ? 10 : 180;
|
||||
// 1/500 = max of 0.2% of one CPU thread
|
||||
int min_interval_factor = foreground ? 10 : 500;
|
||||
cache_entry_stats_collector_->GetStats(
|
||||
&cache_entry_stats_, min_interval_seconds, min_interval_factor);
|
||||
return Status::OK();
|
||||
cache_entry_stats_collector_->CollectStats(min_interval_seconds,
|
||||
min_interval_factor);
|
||||
}
|
||||
|
||||
std::function<void(const Slice&, void*, size_t, Cache::DeleterFn)>
|
||||
|
@ -649,21 +660,25 @@ void InternalStats::CacheEntryRoleStats::ToMap(
|
|||
|
||||
bool InternalStats::HandleBlockCacheEntryStats(std::string* value,
|
||||
Slice /*suffix*/) {
|
||||
Status s = CollectCacheEntryStats(/*foreground*/ true);
|
||||
if (!s.ok()) {
|
||||
if (!cache_entry_stats_collector_) {
|
||||
return false;
|
||||
}
|
||||
*value = cache_entry_stats_.ToString(clock_);
|
||||
CollectCacheEntryStats(/*foreground*/ true);
|
||||
CacheEntryRoleStats stats;
|
||||
cache_entry_stats_collector_->GetStats(&stats);
|
||||
*value = stats.ToString(clock_);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool InternalStats::HandleBlockCacheEntryStatsMap(
|
||||
std::map<std::string, std::string>* values, Slice /*suffix*/) {
|
||||
Status s = CollectCacheEntryStats(/*foreground*/ true);
|
||||
if (!s.ok()) {
|
||||
if (!cache_entry_stats_collector_) {
|
||||
return false;
|
||||
}
|
||||
cache_entry_stats_.ToMap(values, clock_);
|
||||
CollectCacheEntryStats(/*foreground*/ true);
|
||||
CacheEntryRoleStats stats;
|
||||
cache_entry_stats_collector_->GetStats(&stats);
|
||||
stats.ToMap(values, clock_);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -1123,7 +1138,7 @@ bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/,
|
|||
return *value > 0 && *value < std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
bool InternalStats::HandleBlockCacheStat(Cache** block_cache) {
|
||||
bool InternalStats::GetBlockCacheForStats(Cache** block_cache) {
|
||||
assert(block_cache != nullptr);
|
||||
auto* table_factory = cfd_->ioptions()->table_factory.get();
|
||||
assert(table_factory != nullptr);
|
||||
|
@ -1135,7 +1150,7 @@ bool InternalStats::HandleBlockCacheStat(Cache** block_cache) {
|
|||
bool InternalStats::HandleBlockCacheCapacity(uint64_t* value, DBImpl* /*db*/,
|
||||
Version* /*version*/) {
|
||||
Cache* block_cache;
|
||||
bool ok = HandleBlockCacheStat(&block_cache);
|
||||
bool ok = GetBlockCacheForStats(&block_cache);
|
||||
if (!ok) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1146,7 +1161,7 @@ bool InternalStats::HandleBlockCacheCapacity(uint64_t* value, DBImpl* /*db*/,
|
|||
bool InternalStats::HandleBlockCacheUsage(uint64_t* value, DBImpl* /*db*/,
|
||||
Version* /*version*/) {
|
||||
Cache* block_cache;
|
||||
bool ok = HandleBlockCacheStat(&block_cache);
|
||||
bool ok = GetBlockCacheForStats(&block_cache);
|
||||
if (!ok) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1157,7 +1172,7 @@ bool InternalStats::HandleBlockCacheUsage(uint64_t* value, DBImpl* /*db*/,
|
|||
bool InternalStats::HandleBlockCachePinnedUsage(uint64_t* value, DBImpl* /*db*/,
|
||||
Version* /*version*/) {
|
||||
Cache* block_cache;
|
||||
bool ok = HandleBlockCacheStat(&block_cache);
|
||||
bool ok = GetBlockCacheForStats(&block_cache);
|
||||
if (!ok) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1504,7 +1519,8 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
|
|||
vstorage->GetTotalBlobFileSize() / kGB);
|
||||
value->append(buf);
|
||||
|
||||
double seconds_up = (clock_->NowMicros() - started_at_ + 1) / kMicrosInSec;
|
||||
uint64_t now_micros = clock_->NowMicros();
|
||||
double seconds_up = (now_micros - started_at_ + 1) / kMicrosInSec;
|
||||
double interval_seconds_up = seconds_up - cf_stats_snapshot_.seconds_up;
|
||||
snprintf(buf, sizeof(buf), "Uptime(secs): %.1f total, %.1f interval\n",
|
||||
seconds_up, interval_seconds_up);
|
||||
|
@ -1619,14 +1635,20 @@ void InternalStats::DumpCFStatsNoFileHistogram(std::string* value) {
|
|||
cf_stats_snapshot_.comp_stats = compaction_stats_sum;
|
||||
cf_stats_snapshot_.stall_count = total_stall_count;
|
||||
|
||||
// Always treat CFStats context as "background"
|
||||
Status s = CollectCacheEntryStats(/*foreground=*/false);
|
||||
if (s.ok()) {
|
||||
value->append(cache_entry_stats_.ToString(clock_));
|
||||
} else {
|
||||
value->append("Block cache: ");
|
||||
value->append(s.ToString());
|
||||
value->append("\n");
|
||||
// Do not gather cache entry stats during CFStats because DB
|
||||
// mutex is held. Only dump last cached collection (rely on DB
|
||||
// periodic stats dump to update)
|
||||
if (cache_entry_stats_collector_) {
|
||||
CacheEntryRoleStats stats;
|
||||
// thread safe
|
||||
cache_entry_stats_collector_->GetStats(&stats);
|
||||
|
||||
constexpr uint64_t kDayInMicros = uint64_t{86400} * 1000000U;
|
||||
|
||||
// Skip if stats are extremely old (> 1 day, incl not yet populated)
|
||||
if (now_micros - stats.last_end_time_micros_ < kDayInMicros) {
|
||||
value->append(stats.ToString(clock_));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -392,7 +392,6 @@ class InternalStats {
|
|||
cf_stats_count_[i] = 0;
|
||||
cf_stats_value_[i] = 0;
|
||||
}
|
||||
cache_entry_stats_.Clear();
|
||||
for (auto& comp_stat : comp_stats_) {
|
||||
comp_stat.Clear();
|
||||
}
|
||||
|
@ -459,20 +458,20 @@ class InternalStats {
|
|||
bool GetIntPropertyOutOfMutex(const DBPropertyInfo& property_info,
|
||||
Version* version, uint64_t* value);
|
||||
|
||||
// Unless there is a recent enough collection of the stats, collect and
|
||||
// saved new cache entry stats. If `foreground`, require data to be more
|
||||
// recent to skip re-collection.
|
||||
//
|
||||
// This should only be called while NOT holding the DB mutex.
|
||||
void CollectCacheEntryStats(bool foreground);
|
||||
|
||||
const uint64_t* TEST_GetCFStatsValue() const { return cf_stats_value_; }
|
||||
|
||||
const std::vector<CompactionStats>& TEST_GetCompactionStats() const {
|
||||
return comp_stats_;
|
||||
}
|
||||
|
||||
const CacheEntryRoleStats& TEST_GetCacheEntryRoleStats(bool foreground) {
|
||||
Status s = CollectCacheEntryStats(foreground);
|
||||
if (!s.ok()) {
|
||||
assert(false);
|
||||
cache_entry_stats_.Clear();
|
||||
}
|
||||
return cache_entry_stats_;
|
||||
}
|
||||
void TEST_GetCacheEntryRoleStats(CacheEntryRoleStats* stats, bool foreground);
|
||||
|
||||
// Store a mapping from the user-facing DB::Properties string to our
|
||||
// DBPropertyInfo struct used internally for retrieving properties.
|
||||
|
@ -492,16 +491,18 @@ class InternalStats {
|
|||
void DumpCFStatsNoFileHistogram(std::string* value);
|
||||
void DumpCFFileHistogram(std::string* value);
|
||||
|
||||
bool HandleBlockCacheStat(Cache** block_cache);
|
||||
|
||||
Status CollectCacheEntryStats(bool foreground);
|
||||
bool GetBlockCacheForStats(Cache** block_cache);
|
||||
|
||||
// Per-DB stats
|
||||
std::atomic<uint64_t> db_stats_[kIntStatsNumMax];
|
||||
// Per-ColumnFamily stats
|
||||
uint64_t cf_stats_value_[INTERNAL_CF_STATS_ENUM_MAX];
|
||||
uint64_t cf_stats_count_[INTERNAL_CF_STATS_ENUM_MAX];
|
||||
CacheEntryRoleStats cache_entry_stats_;
|
||||
// Initialize/reference the collector in constructor so that we don't need
|
||||
// additional synchronization in InternalStats, relying on synchronization
|
||||
// in CacheEntryStatsCollector::GetStats. This collector is pinned in cache
|
||||
// (through a shared_ptr) so that it does not get immediately ejected from
|
||||
// a full cache, which would force a re-scan on the next GetStats.
|
||||
std::shared_ptr<CacheEntryStatsCollector<CacheEntryRoleStats>>
|
||||
cache_entry_stats_collector_;
|
||||
// Per-ColumnFamily/level compaction stats
|
||||
|
|
|
@ -578,7 +578,7 @@ class Statistics {
|
|||
// Resets all ticker and histogram stats
|
||||
virtual Status Reset() { return Status::NotSupported("Not implemented"); }
|
||||
|
||||
// String representation of the statistic object.
|
||||
// String representation of the statistic object. Must be thread-safe.
|
||||
virtual std::string ToString() const {
|
||||
// Do nothing by default
|
||||
return std::string("ToString(): not implemented");
|
||||
|
|
|
@ -58,7 +58,8 @@ public class MemoryUtilTest {
|
|||
db.getAggregatedLongProperty(UNFLUSHED_MEMTABLE_SIZE));
|
||||
assertThat(usage.get(MemoryUsageType.kTableReadersTotal)).isEqualTo(
|
||||
db.getAggregatedLongProperty(TABLE_READERS));
|
||||
assertThat(usage.get(MemoryUsageType.kCacheTotal)).isEqualTo(0);
|
||||
// TODO(peterd): disable block cache entry stats and check for 0
|
||||
assertThat(usage.get(MemoryUsageType.kCacheTotal)).isLessThan(1024);
|
||||
|
||||
db.put(key, value);
|
||||
db.flush(flushOptions);
|
||||
|
|
|
@ -51,8 +51,7 @@ class InstrumentedMutex {
|
|||
int stats_code_;
|
||||
};
|
||||
|
||||
// A wrapper class for port::Mutex that provides additional layer
|
||||
// for collecting stats and instrumentation.
|
||||
// RAII wrapper for InstrumentedMutex
|
||||
class InstrumentedMutexLock {
|
||||
public:
|
||||
explicit InstrumentedMutexLock(InstrumentedMutex* mutex) : mutex_(mutex) {
|
||||
|
@ -69,6 +68,22 @@ class InstrumentedMutexLock {
|
|||
void operator=(const InstrumentedMutexLock&) = delete;
|
||||
};
|
||||
|
||||
// RAII wrapper for temporary releasing InstrumentedMutex inside
|
||||
// InstrumentedMutexLock
|
||||
class InstrumentedMutexUnlock {
|
||||
public:
|
||||
explicit InstrumentedMutexUnlock(InstrumentedMutex* mutex) : mutex_(mutex) {
|
||||
mutex_->Unlock();
|
||||
}
|
||||
|
||||
~InstrumentedMutexUnlock() { mutex_->Lock(); }
|
||||
|
||||
private:
|
||||
InstrumentedMutex* const mutex_;
|
||||
InstrumentedMutexUnlock(const InstrumentedMutexUnlock&) = delete;
|
||||
void operator=(const InstrumentedMutexUnlock&) = delete;
|
||||
};
|
||||
|
||||
class InstrumentedCondVar {
|
||||
public:
|
||||
explicit InstrumentedCondVar(InstrumentedMutex* instrumented_mutex)
|
||||
|
|
|
@ -4,7 +4,9 @@
|
|||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#include "rocksdb/utilities/sim_cache.h"
|
||||
|
||||
#include <cstdlib>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "port/stack_trace.h"
|
||||
|
||||
|
@ -87,6 +89,8 @@ TEST_F(SimCacheTest, SimCache) {
|
|||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
Reopen(options);
|
||||
RecordCacheCounters(options);
|
||||
// due to cache entry stats collector
|
||||
uint64_t base_misses = simCache->get_miss_counter();
|
||||
|
||||
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks);
|
||||
Iterator* iter = nullptr;
|
||||
|
@ -99,8 +103,8 @@ TEST_F(SimCacheTest, SimCache) {
|
|||
CheckCacheCounters(options, 1, 0, 1, 0);
|
||||
iterators[i].reset(iter);
|
||||
}
|
||||
ASSERT_EQ(kNumBlocks,
|
||||
simCache->get_hit_counter() + simCache->get_miss_counter());
|
||||
ASSERT_EQ(kNumBlocks, simCache->get_hit_counter() +
|
||||
simCache->get_miss_counter() - base_misses);
|
||||
ASSERT_EQ(0, simCache->get_hit_counter());
|
||||
size_t usage = simCache->GetUsage();
|
||||
ASSERT_LT(0, usage);
|
||||
|
@ -137,8 +141,8 @@ TEST_F(SimCacheTest, SimCache) {
|
|||
CheckCacheCounters(options, 1, 0, 1, 0);
|
||||
}
|
||||
ASSERT_EQ(0, simCache->GetPinnedUsage());
|
||||
ASSERT_EQ(3 * kNumBlocks + 1,
|
||||
simCache->get_hit_counter() + simCache->get_miss_counter());
|
||||
ASSERT_EQ(3 * kNumBlocks + 1, simCache->get_hit_counter() +
|
||||
simCache->get_miss_counter() - base_misses);
|
||||
ASSERT_EQ(6, simCache->get_hit_counter());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue