From 78a309bf867b19c1b01dfe86169016cf94652d91 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Tue, 11 May 2021 16:16:11 -0700 Subject: [PATCH] New Cache API for gathering statistics (#8225) Summary: Adds a new Cache::ApplyToAllEntries API that we expect to use (in follow-up PRs) for efficiently gathering block cache statistics. Notable features vs. old ApplyToAllCacheEntries: * Includes key and deleter (in addition to value and charge). We could have passed in a Handle but then more virtual function calls would be needed to get the "fields" of each entry. We expect to use the 'deleter' to identify the origin of entries, perhaps even more. * Heavily tuned to minimize latency impact on operating cache. It does this by iterating over small sections of each cache shard while cycling through the shards. * Supports tuning roughly how many entries to operate on for each lock acquire and release, to control the impact on the latency of other operations without excessive lock acquire & release. The right balance can depend on the cost of the callback. Good default seems to be around 256. * There should be no need to disable thread safety. (I would expect uncontended locks to be sufficiently fast.) I have enhanced cache_bench to validate this approach: * Reports a histogram of ns per operation, so we can look at the ditribution of times, not just throughput (average). * Can add a thread for simulated "gather stats" which calls ApplyToAllEntries at a specified interval. We also generate a histogram of time to run ApplyToAllEntries. To make the iteration over some entries of each shard work as cleanly as possible, even with resize between next set of entries, I have re-arranged which hash bits are used for sharding and which for indexing within a shard. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8225 Test Plan: A couple of unit tests are added, but primary validation is manual, as the primary risk is to performance. The primary validation is using cache_bench to ensure that neither the minor hashing changes nor the simulated stats gathering significantly impact QPS or latency distribution. Note that adding op latency histogram seriously impacts the benchmark QPS, so for a fair baseline, we need the cache_bench changes (except remove simulated stat gathering to make it compile). In short, we don't see any reproducible difference in ops/sec or op latency unless we are gathering stats nearly continuously. Test uses 10GB block cache with 8KB values to be somewhat realistic in the number of items to iterate over. Baseline typical output: ``` Complete in 92.017 s; Rough parallel ops/sec = 869401 Thread ops/sec = 54662 Operation latency (ns): Count: 80000000 Average: 11223.9494 StdDev: 29.61 Min: 0 Median: 7759.3973 Max: 9620500 Percentiles: P50: 7759.40 P75: 14190.73 P99: 46922.75 P99.9: 77509.84 P99.99: 217030.58 ------------------------------------------------------ [ 0, 1 ] 68 0.000% 0.000% ( 2900, 4400 ] 89 0.000% 0.000% ( 4400, 6600 ] 33630240 42.038% 42.038% ######## ( 6600, 9900 ] 18129842 22.662% 64.700% ##### ( 9900, 14000 ] 7877533 9.847% 74.547% ## ( 14000, 22000 ] 15193238 18.992% 93.539% #### ( 22000, 33000 ] 3037061 3.796% 97.335% # ( 33000, 50000 ] 1626316 2.033% 99.368% ( 50000, 75000 ] 421532 0.527% 99.895% ( 75000, 110000 ] 56910 0.071% 99.966% ( 110000, 170000 ] 16134 0.020% 99.986% ( 170000, 250000 ] 5166 0.006% 99.993% ( 250000, 380000 ] 3017 0.004% 99.996% ( 380000, 570000 ] 1337 0.002% 99.998% ( 570000, 860000 ] 805 0.001% 99.999% ( 860000, 1200000 ] 319 0.000% 100.000% ( 1200000, 1900000 ] 231 0.000% 100.000% ( 1900000, 2900000 ] 100 0.000% 100.000% ( 2900000, 4300000 ] 39 0.000% 100.000% ( 4300000, 6500000 ] 16 0.000% 100.000% ( 6500000, 9800000 ] 7 0.000% 100.000% ``` New, gather_stats=false. Median thread ops/sec of 5 runs: ``` Complete in 92.030 s; Rough parallel ops/sec = 869285 Thread ops/sec = 54458 Operation latency (ns): Count: 80000000 Average: 11298.1027 StdDev: 42.18 Min: 0 Median: 7722.0822 Max: 6398720 Percentiles: P50: 7722.08 P75: 14294.68 P99: 47522.95 P99.9: 85292.16 P99.99: 228077.78 ------------------------------------------------------ [ 0, 1 ] 109 0.000% 0.000% ( 2900, 4400 ] 793 0.001% 0.001% ( 4400, 6600 ] 34054563 42.568% 42.569% ######### ( 6600, 9900 ] 17482646 21.853% 64.423% #### ( 9900, 14000 ] 7908180 9.885% 74.308% ## ( 14000, 22000 ] 15032072 18.790% 93.098% #### ( 22000, 33000 ] 3237834 4.047% 97.145% # ( 33000, 50000 ] 1736882 2.171% 99.316% ( 50000, 75000 ] 446851 0.559% 99.875% ( 75000, 110000 ] 68251 0.085% 99.960% ( 110000, 170000 ] 18592 0.023% 99.983% ( 170000, 250000 ] 7200 0.009% 99.992% ( 250000, 380000 ] 3334 0.004% 99.997% ( 380000, 570000 ] 1393 0.002% 99.998% ( 570000, 860000 ] 700 0.001% 99.999% ( 860000, 1200000 ] 293 0.000% 100.000% ( 1200000, 1900000 ] 196 0.000% 100.000% ( 1900000, 2900000 ] 69 0.000% 100.000% ( 2900000, 4300000 ] 32 0.000% 100.000% ( 4300000, 6500000 ] 10 0.000% 100.000% ``` New, gather_stats=true, 1 second delay between scans. Scans take about 1 second here so it's spending about 50% time scanning. Still the effect on ops/sec and latency seems to be in the noise. Median thread ops/sec of 5 runs: ``` Complete in 91.890 s; Rough parallel ops/sec = 870608 Thread ops/sec = 54551 Operation latency (ns): Count: 80000000 Average: 11311.2629 StdDev: 45.28 Min: 0 Median: 7686.5458 Max: 10018340 Percentiles: P50: 7686.55 P75: 14481.95 P99: 47232.60 P99.9: 79230.18 P99.99: 232998.86 ------------------------------------------------------ [ 0, 1 ] 71 0.000% 0.000% ( 2900, 4400 ] 291 0.000% 0.000% ( 4400, 6600 ] 34492060 43.115% 43.116% ######### ( 6600, 9900 ] 16727328 20.909% 64.025% #### ( 9900, 14000 ] 7845828 9.807% 73.832% ## ( 14000, 22000 ] 15510654 19.388% 93.220% #### ( 22000, 33000 ] 3216533 4.021% 97.241% # ( 33000, 50000 ] 1680859 2.101% 99.342% ( 50000, 75000 ] 439059 0.549% 99.891% ( 75000, 110000 ] 60540 0.076% 99.967% ( 110000, 170000 ] 14649 0.018% 99.985% ( 170000, 250000 ] 5242 0.007% 99.991% ( 250000, 380000 ] 3260 0.004% 99.995% ( 380000, 570000 ] 1599 0.002% 99.997% ( 570000, 860000 ] 1043 0.001% 99.999% ( 860000, 1200000 ] 471 0.001% 99.999% ( 1200000, 1900000 ] 275 0.000% 100.000% ( 1900000, 2900000 ] 143 0.000% 100.000% ( 2900000, 4300000 ] 60 0.000% 100.000% ( 4300000, 6500000 ] 27 0.000% 100.000% ( 6500000, 9800000 ] 7 0.000% 100.000% ( 9800000, 14000000 ] 1 0.000% 100.000% Gather stats latency (us): Count: 46 Average: 980387.5870 StdDev: 60911.18 Min: 879155 Median: 1033777.7778 Max: 1261431 Percentiles: P50: 1033777.78 P75: 1120666.67 P99: 1261431.00 P99.9: 1261431.00 P99.99: 1261431.00 ------------------------------------------------------ ( 860000, 1200000 ] 45 97.826% 97.826% #################### ( 1200000, 1900000 ] 1 2.174% 100.000% Most recent cache entry stats: Number of entries: 1295133 Total charge: 9.88 GB Average key size: 23.4982 Average charge: 8.00 KB Unique deleters: 3 ``` Reviewed By: mrambacher Differential Revision: D28295742 Pulled By: pdillinger fbshipit-source-id: bbc4a552f91ba0fe10e5cc025c42cef5a81f2b95 --- HISTORY.md | 1 + cache/cache_bench.cc | 222 ++++++++++++++++++++----- cache/cache_test.cc | 87 +++++++++- cache/clock_cache.cc | 56 +++++-- cache/lru_cache.cc | 105 ++++++++---- cache/lru_cache.h | 38 +++-- cache/lru_cache_test.cc | 7 +- cache/sharded_cache.cc | 72 +++++--- cache/sharded_cache.h | 43 ++--- db/db_compaction_test.cc | 4 + db/db_test_util.h | 7 + include/rocksdb/cache.h | 48 +++++- utilities/simulator_cache/sim_cache.cc | 7 + 13 files changed, 537 insertions(+), 160 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 91ced2d716..1f23fac2f7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -23,6 +23,7 @@ * Removed unused structure `CompactionFilterContext`. * The `skip_filters` parameter to SstFileWriter is now considered deprecated. Use `BlockBasedTableOptions::filter_policy` to control generation of filters. * ClockCache is known to have bugs that could lead to crash or corruption, so should not be used until fixed. Use NewLRUCache instead. +* Added a new pure virtual function `ApplyToAllEntries` to `Cache`, to replace `ApplyToAllCacheEntries`. Custom `Cache` implementations must add an implementation. Because this function is for gathering statistics, an empty implementation could be acceptable for some applications. * Added the ObjectRegistry to the ConfigOptions class. This registry instance will be used to find any customizable loadable objects during initialization. * Expanded the ObjectRegistry functionality to allow nested ObjectRegistry instances. Added methods to register a set of functions with the registry/library as a group. * Deprecated backupable_db.h and BackupableDBOptions in favor of new versions with appropriate names: backup_engine.h and BackupEngineOptions. Old API compatibility is preserved. diff --git a/cache/cache_bench.cc b/cache/cache_bench.cc index 48cf2b44d1..b7de5ae86b 100644 --- a/cache/cache_bench.cc +++ b/cache/cache_bench.cc @@ -3,30 +3,34 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). -#ifndef GFLAGS +#include #include +#include +#include +#include + +#include "monitoring/histogram.h" +#include "port/port.h" +#include "rocksdb/cache.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/system_clock.h" +#include "table/block_based/cachable_entry.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/stop_watch.h" +#include "util/string_util.h" + +#ifndef GFLAGS int main() { fprintf(stderr, "Please install gflags to run rocksdb tools\n"); return 1; } #else -#include - -#include -#include -#include - -#include "port/port.h" -#include "rocksdb/cache.h" -#include "rocksdb/db.h" -#include "rocksdb/env.h" -#include "rocksdb/system_clock.h" -#include "util/coding.h" #include "util/gflags_compat.h" -#include "util/hash.h" -#include "util/mutexlock.h" -#include "util/random.h" using GFLAGS_NAMESPACE::ParseCommandLineFlags; @@ -41,8 +45,7 @@ DEFINE_uint32(num_shard_bits, 6, "shard_bits."); DEFINE_double(resident_ratio, 0.25, "Ratio of keys fitting in cache to keyspace."); -DEFINE_uint64(ops_per_thread, 0, - "Number of operations per thread. (Default: 5 * keyspace size)"); +DEFINE_uint64(ops_per_thread, 2000000U, "Number of operations per thread."); DEFINE_uint32(value_bytes, 8 * KiB, "Size of each value added."); DEFINE_uint32(skew, 5, "Degree of skew in key selection"); @@ -57,6 +60,15 @@ DEFINE_uint32(lookup_percent, 10, "Ratio of lookup to total workload (expressed as a percentage)"); DEFINE_uint32(erase_percent, 1, "Ratio of erase to total workload (expressed as a percentage)"); +DEFINE_bool(gather_stats, false, + "Whether to periodically simulate gathering block cache stats, " + "using one more thread."); +DEFINE_uint32( + gather_stats_sleep_ms, 1000, + "How many milliseconds to sleep between each gathering of stats."); + +DEFINE_uint32(gather_stats_entries_per_lock, 256, + "For Cache::ApplyToAllEntries"); DEFINE_bool(use_clock_cache, false, ""); @@ -124,6 +136,8 @@ struct ThreadState { uint32_t tid; Random64 rnd; SharedState* shared; + HistogramImpl latency_ns_hist; + uint64_t duration_us = 0; ThreadState(uint32_t index, SharedState* _shared) : tid(index), rnd(1000 + index), shared(_shared) {} @@ -160,7 +174,15 @@ char* createValue(Random64& rnd) { return rv; } -void deleter(const Slice& /*key*/, void* value) { +// Different deleters to simulate using deleter to gather +// stats on the code origin and kind of cache entries. +void deleter1(const Slice& /*key*/, void* value) { + delete[] static_cast(value); +} +void deleter2(const Slice& /*key*/, void* value) { + delete[] static_cast(value); +} +void deleter3(const Slice& /*key*/, void* value) { delete[] static_cast(value); } } // namespace @@ -194,9 +216,6 @@ class CacheBench { } else { cache_ = NewLRUCache(FLAGS_cache_size, FLAGS_num_shard_bits); } - if (FLAGS_ops_per_thread == 0) { - FLAGS_ops_per_thread = 5 * max_key_; - } } ~CacheBench() {} @@ -206,28 +225,33 @@ class CacheBench { KeyGen keygen; for (uint64_t i = 0; i < 2 * FLAGS_cache_size; i += FLAGS_value_bytes) { cache_->Insert(keygen.GetRand(rnd, max_key_), createValue(rnd), - FLAGS_value_bytes, &deleter); + FLAGS_value_bytes, &deleter1); } } bool Run() { - ROCKSDB_NAMESPACE::Env* env = ROCKSDB_NAMESPACE::Env::Default(); - const auto& clock = env->GetSystemClock(); + const auto clock = SystemClock::Default().get(); PrintEnv(); SharedState shared(this); std::vector > threads(FLAGS_threads); for (uint32_t i = 0; i < FLAGS_threads; i++) { threads[i].reset(new ThreadState(i, &shared)); - env->StartThread(ThreadBody, threads[i].get()); + std::thread(ThreadBody, threads[i].get()).detach(); } + + HistogramImpl stats_hist; + std::string stats_report; + std::thread stats_thread(StatsBody, &shared, &stats_hist, &stats_report); + + uint64_t start_time; { MutexLock l(shared.GetMutex()); while (!shared.AllInitialized()) { shared.GetCondVar()->Wait(); } // Record start time - uint64_t start_time = clock->NowMicros(); + start_time = clock->NowMicros(); // Start all threads shared.SetStart(); @@ -237,14 +261,44 @@ class CacheBench { while (!shared.AllDone()) { shared.GetCondVar()->Wait(); } - - // Record end time - uint64_t end_time = clock->NowMicros(); - double elapsed = static_cast(end_time - start_time) * 1e-6; - uint32_t qps = static_cast( - static_cast(FLAGS_threads * FLAGS_ops_per_thread) / elapsed); - fprintf(stdout, "Complete in %.3f s; QPS = %u\n", elapsed, qps); } + + // Stats gathering is considered background work. This time measurement + // is for foreground work, and not really ideal for that. See below. + uint64_t end_time = clock->NowMicros(); + stats_thread.join(); + + // Wall clock time - includes idle time if threads + // finish at different times (not ideal). + double elapsed_secs = static_cast(end_time - start_time) * 1e-6; + uint32_t ops_per_sec = static_cast( + 1.0 * FLAGS_threads * FLAGS_ops_per_thread / elapsed_secs); + printf("Complete in %.3f s; Rough parallel ops/sec = %u\n", elapsed_secs, + ops_per_sec); + + // Total time in each thread (more accurate throughput measure) + elapsed_secs = 0; + for (uint32_t i = 0; i < FLAGS_threads; i++) { + elapsed_secs += threads[i]->duration_us * 1e-6; + } + ops_per_sec = static_cast(1.0 * FLAGS_threads * + FLAGS_ops_per_thread / elapsed_secs); + printf("Thread ops/sec = %u\n", ops_per_sec); + + printf("\nOperation latency (ns):\n"); + HistogramImpl combined; + for (uint32_t i = 0; i < FLAGS_threads; i++) { + combined.Merge(threads[i]->latency_ns_hist); + } + printf("%s", combined.ToString().c_str()); + + if (FLAGS_gather_stats) { + printf("\nGather stats latency (us):\n"); + printf("%s", stats_hist.ToString().c_str()); + } + + printf("\n%s", stats_report.c_str()); + return true; } @@ -257,8 +311,77 @@ class CacheBench { const uint64_t lookup_threshold_; const uint64_t erase_threshold_; - static void ThreadBody(void* v) { - ThreadState* thread = static_cast(v); + // A benchmark version of gathering stats on an active block cache by + // iterating over it. The primary purpose is to measure the impact of + // gathering stats with ApplyToAllEntries on throughput- and + // latency-sensitive Cache users. Performance of stats gathering is + // also reported. The last set of gathered stats is also reported, for + // manual sanity checking for logical errors or other unexpected + // behavior of cache_bench or the underlying Cache. + static void StatsBody(SharedState* shared, HistogramImpl* stats_hist, + std::string* stats_report) { + if (!FLAGS_gather_stats) { + return; + } + const auto clock = SystemClock::Default().get(); + uint64_t total_key_size = 0; + uint64_t total_charge = 0; + uint64_t total_entry_count = 0; + std::set deleters; + StopWatchNano timer(clock); + + for (;;) { + uint64_t time; + time = clock->NowMicros(); + uint64_t deadline = time + uint64_t{FLAGS_gather_stats_sleep_ms} * 1000; + + { + MutexLock l(shared->GetMutex()); + for (;;) { + if (shared->AllDone()) { + std::ostringstream ostr; + ostr << "Most recent cache entry stats:\n" + << "Number of entries: " << total_entry_count << "\n" + << "Total charge: " << BytesToHumanString(total_charge) << "\n" + << "Average key size: " + << (1.0 * total_key_size / total_entry_count) << "\n" + << "Average charge: " + << BytesToHumanString(1.0 * total_charge / total_entry_count) + << "\n" + << "Unique deleters: " << deleters.size() << "\n"; + *stats_report = ostr.str(); + return; + } + if (clock->NowMicros() >= deadline) { + break; + } + uint64_t diff = deadline - std::min(clock->NowMicros(), deadline); + shared->GetCondVar()->TimedWait(diff + 1); + } + } + + // Now gather stats, outside of mutex + total_key_size = 0; + total_charge = 0; + total_entry_count = 0; + deleters.clear(); + auto fn = [&](const Slice& key, void* /*value*/, size_t charge, + Cache::DeleterFn deleter) { + total_key_size += key.size(); + total_charge += charge; + ++total_entry_count; + // Something slightly more expensive as in (future) stats by category + deleters.insert(deleter); + }; + timer.Start(); + Cache::ApplyToAllEntriesOptions opts; + opts.average_entries_per_lock = FLAGS_gather_stats_entries_per_lock; + shared->GetCacheBench()->cache_->ApplyToAllEntries(fn, opts); + stats_hist->Add(timer.ElapsedNanos() / 1000); + } + } + + static void ThreadBody(ThreadState* thread) { SharedState* shared = thread->shared; { @@ -288,7 +411,12 @@ class CacheBench { // To hold handles for a non-trivial amount of time Cache::Handle* handle = nullptr; KeyGen gen; + const auto clock = SystemClock::Default().get(); + uint64_t start_time = clock->NowMicros(); + StopWatchNano timer(clock); + for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { + timer.Start(); Slice key = gen.GetRand(thread->rnd, max_key_); uint64_t random_op = thread->rnd.Next(); if (random_op < lookup_insert_threshold_) { @@ -305,7 +433,7 @@ class CacheBench { } else { // do insert cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, - &deleter, &handle); + &deleter2, &handle); } } else if (random_op < insert_threshold_) { if (handle) { @@ -314,7 +442,7 @@ class CacheBench { } // do insert cache_->Insert(key, createValue(thread->rnd), FLAGS_value_bytes, - &deleter, &handle); + &deleter3, &handle); } else if (random_op < lookup_threshold_) { if (handle) { cache_->Release(handle); @@ -334,18 +462,26 @@ class CacheBench { // Should be extremely unlikely (noop) assert(random_op >= kHundredthUint64 * 100U); } + thread->latency_ns_hist.Add(timer.ElapsedNanos()); } if (handle) { cache_->Release(handle); handle = nullptr; } + // Ensure computations on `result` are not optimized away. + if (result == 1) { + printf("You are extremely unlucky(2). Try again.\n"); + exit(1); + } + thread->duration_us = clock->NowMicros() - start_time; } void PrintEnv() const { printf("RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); printf("Number of threads : %u\n", FLAGS_threads); printf("Ops per thread : %" PRIu64 "\n", FLAGS_ops_per_thread); - printf("Cache size : %" PRIu64 "\n", FLAGS_cache_size); + printf("Cache size : %s\n", + BytesToHumanString(FLAGS_cache_size).c_str()); printf("Num shard bits : %u\n", FLAGS_num_shard_bits); printf("Max key : %" PRIu64 "\n", max_key_); printf("Resident ratio : %g\n", FLAGS_resident_ratio); @@ -355,6 +491,14 @@ class CacheBench { printf("Insert percentage : %u%%\n", FLAGS_insert_percent); printf("Lookup percentage : %u%%\n", FLAGS_lookup_percent); printf("Erase percentage : %u%%\n", FLAGS_erase_percent); + std::ostringstream stats; + if (FLAGS_gather_stats) { + stats << "enabled (" << FLAGS_gather_stats_sleep_ms << "ms, " + << FLAGS_gather_stats_entries_per_lock << "/lock)"; + } else { + stats << "disabled"; + } + printf("Gather stats : %s\n", stats.str().c_str()); printf("----------------------------\n"); } }; diff --git a/cache/cache_test.cc b/cache/cache_test.cc index 40d8c42cc7..80f3ce3d1d 100644 --- a/cache/cache_test.cc +++ b/cache/cache_test.cc @@ -712,25 +712,98 @@ TEST_P(CacheTest, OverCapacity) { } namespace { -std::vector> callback_state; -void callback(void* entry, size_t charge) { - callback_state.push_back({DecodeValue(entry), static_cast(charge)}); +std::vector> legacy_callback_state; +void legacy_callback(void* value, size_t charge) { + legacy_callback_state.push_back( + {DecodeValue(value), static_cast(charge)}); } }; -TEST_P(CacheTest, ApplyToAllCacheEntiresTest) { +TEST_P(CacheTest, ApplyToAllCacheEntriesTest) { std::vector> inserted; - callback_state.clear(); + legacy_callback_state.clear(); for (int i = 0; i < 10; ++i) { Insert(i, i * 2, i + 1); inserted.push_back({i * 2, i + 1}); } - cache_->ApplyToAllCacheEntries(callback, true); + cache_->ApplyToAllCacheEntries(legacy_callback, true); + + std::sort(inserted.begin(), inserted.end()); + std::sort(legacy_callback_state.begin(), legacy_callback_state.end()); + ASSERT_EQ(inserted.size(), legacy_callback_state.size()); + for (size_t i = 0; i < inserted.size(); ++i) { + EXPECT_EQ(inserted[i], legacy_callback_state[i]); + } +} + +TEST_P(CacheTest, ApplyToAllEntriesTest) { + std::vector callback_state; + const auto callback = [&](const Slice& key, void* value, size_t charge, + Cache::DeleterFn deleter) { + callback_state.push_back(ToString(DecodeKey(key)) + "," + + ToString(DecodeValue(value)) + "," + + ToString(charge)); + assert(deleter == &CacheTest::Deleter); + }; + + std::vector inserted; + callback_state.clear(); + + for (int i = 0; i < 10; ++i) { + Insert(i, i * 2, i + 1); + inserted.push_back(ToString(i) + "," + ToString(i * 2) + "," + + ToString(i + 1)); + } + cache_->ApplyToAllEntries(callback, /*opts*/ {}); std::sort(inserted.begin(), inserted.end()); std::sort(callback_state.begin(), callback_state.end()); - ASSERT_TRUE(inserted == callback_state); + ASSERT_EQ(inserted.size(), callback_state.size()); + for (size_t i = 0; i < inserted.size(); ++i) { + EXPECT_EQ(inserted[i], callback_state[i]); + } +} + +TEST_P(CacheTest, ApplyToAllEntriesDuringResize) { + // This is a mini-stress test of ApplyToAllEntries, to ensure + // items in the cache that are neither added nor removed + // during ApplyToAllEntries are counted exactly once. + + // Insert some entries that we expect to be seen exactly once + // during iteration. + constexpr int kSpecialCharge = 2; + constexpr int kNotSpecialCharge = 1; + constexpr int kSpecialCount = 100; + for (int i = 0; i < kSpecialCount; ++i) { + Insert(i, i * 2, kSpecialCharge); + } + + // For callback + int special_count = 0; + const auto callback = [&](const Slice&, void*, size_t charge, + Cache::DeleterFn) { + if (charge == static_cast(kSpecialCharge)) { + ++special_count; + } + }; + + // Start counting + std::thread apply_thread([&]() { + // Use small average_entries_per_lock to make the problem difficult + Cache::ApplyToAllEntriesOptions opts; + opts.average_entries_per_lock = 2; + cache_->ApplyToAllEntries(callback, opts); + }); + + // In parallel, add more entries, enough to cause resize but not enough + // to cause ejections + for (int i = kSpecialCount * 1; i < kSpecialCount * 6; ++i) { + Insert(i, i * 2, kNotSpecialCharge); + } + + apply_thread.join(); + ASSERT_EQ(special_count, kSpecialCount); } TEST_P(CacheTest, DefaultShardBits) { diff --git a/cache/clock_cache.cc b/cache/clock_cache.cc index afd42b5824..26287ff22c 100644 --- a/cache/clock_cache.cc +++ b/cache/clock_cache.cc @@ -284,8 +284,10 @@ class ClockCacheShard final : public CacheShard { size_t GetUsage() const override; size_t GetPinnedUsage() const override; void EraseUnRefEntries() override; - void ApplyToAllCacheEntries(void (*callback)(void*, size_t), - bool thread_safe) override; + void ApplyToSomeEntries( + const std::function& callback, + uint32_t average_entries_per_lock, uint32_t* state) override; private: static const uint32_t kInCacheBit = 1; @@ -404,22 +406,46 @@ size_t ClockCacheShard::GetPinnedUsage() const { return pinned_usage_.load(std::memory_order_relaxed); } -void ClockCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), - bool thread_safe) { - if (thread_safe) { - mutex_.Lock(); +void ClockCacheShard::ApplyToSomeEntries( + const std::function& callback, + uint32_t average_entries_per_lock, uint32_t* state) { + assert(average_entries_per_lock > 0); + MutexLock lock(&mutex_); + + // Figure out the range to iterate, update `state` + size_t list_size = list_.size(); + size_t start_idx = *state; + size_t end_idx = start_idx + average_entries_per_lock; + if (start_idx > list_size) { + // Shouldn't reach here, but recoverable + assert(false); + // Mark finished with all + *state = UINT32_MAX; + return; } - for (auto& handle : list_) { - // Use relaxed semantics instead of acquire semantics since we are either - // holding mutex, or don't have thread safe requirement. + if (end_idx >= list_size || end_idx >= UINT32_MAX) { + // This also includes the hypothetical case of >4 billion + // cache handles. + end_idx = list_size; + // Mark finished with all + *state = UINT32_MAX; + } else { + *state = end_idx; + } + + // Do the iteration + auto cur = list_.begin() + start_idx; + auto end = list_.begin() + end_idx; + for (; cur != end; ++cur) { + const CacheHandle& handle = *cur; + // Use relaxed semantics instead of acquire semantics since we are + // holding mutex uint32_t flags = handle.flags.load(std::memory_order_relaxed); if (InCache(flags)) { - callback(handle.value, handle.charge); + callback(handle.key, handle.value, handle.charge, handle.deleter); } } - if (thread_safe) { - mutex_.Unlock(); - } } void ClockCacheShard::RecycleHandle(CacheHandle* handle, @@ -739,11 +765,11 @@ class ClockCache final : public ShardedCache { const char* Name() const override { return "ClockCache"; } - CacheShard* GetShard(int shard) override { + CacheShard* GetShard(uint32_t shard) override { return reinterpret_cast(&shards_[shard]); } - const CacheShard* GetShard(int shard) const override { + const CacheShard* GetShard(uint32_t shard) const override { return reinterpret_cast(&shards_[shard]); } diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index f421901216..04aa8facfa 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -10,24 +10,27 @@ #include "cache/lru_cache.h" #include +#include #include -#include #include "util/mutexlock.h" namespace ROCKSDB_NAMESPACE { -LRUHandleTable::LRUHandleTable() : list_(nullptr), length_(0), elems_(0) { - Resize(); -} +LRUHandleTable::LRUHandleTable(int max_upper_hash_bits) + : length_bits_(/* historical starting size*/ 4), + list_(new LRUHandle* [size_t{1} << length_bits_] {}), + elems_(0), + max_length_bits_(max_upper_hash_bits) {} LRUHandleTable::~LRUHandleTable() { - ApplyToAllCacheEntries([](LRUHandle* h) { - if (!h->HasRefs()) { - h->Free(); - } - }); - delete[] list_; + ApplyToEntriesRange( + [](LRUHandle* h) { + if (!h->HasRefs()) { + h->Free(); + } + }, + 0, uint32_t{1} << length_bits_); } LRUHandle* LRUHandleTable::Lookup(const Slice& key, uint32_t hash) { @@ -41,7 +44,7 @@ LRUHandle* LRUHandleTable::Insert(LRUHandle* h) { *ptr = h; if (old == nullptr) { ++elems_; - if (elems_ > length_) { + if ((elems_ >> length_bits_) > 0) { // elems_ >= length // Since each cache entry is fairly large, we aim for a small // average linked list length (<= 1). Resize(); @@ -61,7 +64,7 @@ LRUHandle* LRUHandleTable::Remove(const Slice& key, uint32_t hash) { } LRUHandle** LRUHandleTable::FindPointer(const Slice& key, uint32_t hash) { - LRUHandle** ptr = &list_[hash & (length_ - 1)]; + LRUHandle** ptr = &list_[hash >> (32 - length_bits_)]; while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) { ptr = &(*ptr)->next_hash; } @@ -69,19 +72,29 @@ LRUHandle** LRUHandleTable::FindPointer(const Slice& key, uint32_t hash) { } void LRUHandleTable::Resize() { - uint32_t new_length = 16; - while (new_length < elems_ * 1.5) { - new_length *= 2; + if (length_bits_ >= max_length_bits_) { + // Due to reaching limit of hash information, if we made the table + // bigger, we would allocate more addresses but only the same + // number would be used. + return; } - LRUHandle** new_list = new LRUHandle*[new_length]; - memset(new_list, 0, sizeof(new_list[0]) * new_length); + if (length_bits_ >= 31) { + // Avoid undefined behavior shifting uint32_t by 32 + return; + } + + uint32_t old_length = uint32_t{1} << length_bits_; + int new_length_bits = length_bits_ + 1; + std::unique_ptr new_list { + new LRUHandle* [size_t{1} << new_length_bits] {} + }; uint32_t count = 0; - for (uint32_t i = 0; i < length_; i++) { + for (uint32_t i = 0; i < old_length; i++) { LRUHandle* h = list_[i]; while (h != nullptr) { LRUHandle* next = h->next_hash; uint32_t hash = h->hash; - LRUHandle** ptr = &new_list[hash & (new_length - 1)]; + LRUHandle** ptr = &new_list[hash >> (32 - new_length_bits)]; h->next_hash = *ptr; *ptr = h; h = next; @@ -89,20 +102,21 @@ void LRUHandleTable::Resize() { } } assert(elems_ == count); - delete[] list_; - list_ = new_list; - length_ = new_length; + list_ = std::move(new_list); + length_bits_ = new_length_bits; } LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy) + CacheMetadataChargePolicy metadata_charge_policy, + int max_upper_hash_bits) : capacity_(0), high_pri_pool_usage_(0), strict_capacity_limit_(strict_capacity_limit), high_pri_pool_ratio_(high_pri_pool_ratio), high_pri_pool_capacity_(0), + table_(max_upper_hash_bits), usage_(0), lru_usage_(0), mutex_(use_adaptive_mutex) { @@ -137,19 +151,37 @@ void LRUCacheShard::EraseUnRefEntries() { } } -void LRUCacheShard::ApplyToAllCacheEntries(void (*callback)(void*, size_t), - bool thread_safe) { - const auto applyCallback = [&]() { - table_.ApplyToAllCacheEntries( - [callback](LRUHandle* h) { callback(h->value, h->charge); }); - }; +void LRUCacheShard::ApplyToSomeEntries( + const std::function& callback, + uint32_t average_entries_per_lock, uint32_t* state) { + // The state is essentially going to be the starting hash, which works + // nicely even if we resize between calls because we use upper-most + // hash bits for table indexes. + MutexLock l(&mutex_); + uint32_t length_bits = table_.GetLengthBits(); + uint32_t length = uint32_t{1} << length_bits; - if (thread_safe) { - MutexLock l(&mutex_); - applyCallback(); + assert(average_entries_per_lock > 0); + // Assuming we are called with same average_entries_per_lock repeatedly, + // this simplifies some logic (index_end will not overflow) + assert(average_entries_per_lock < length || *state == 0); + + uint32_t index_begin = *state >> (32 - length_bits); + uint32_t index_end = index_begin + average_entries_per_lock; + if (index_end >= length) { + // Going to end + index_end = length; + *state = UINT32_MAX; } else { - applyCallback(); + *state = index_end << (32 - length_bits); } + + table_.ApplyToEntriesRange( + [callback](LRUHandle* h) { + callback(h->key(), h->value, h->charge, h->deleter); + }, + index_begin, index_end); } void LRUCacheShard::TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri) { @@ -478,7 +510,8 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, for (int i = 0; i < num_shards_; i++) { new (&shards_[i]) LRUCacheShard(per_shard, strict_capacity_limit, high_pri_pool_ratio, - use_adaptive_mutex, metadata_charge_policy); + use_adaptive_mutex, metadata_charge_policy, + /* max_upper_hash_bits */ 32 - num_shard_bits); } } @@ -492,11 +525,11 @@ LRUCache::~LRUCache() { } } -CacheShard* LRUCache::GetShard(int shard) { +CacheShard* LRUCache::GetShard(uint32_t shard) { return reinterpret_cast(&shards_[shard]); } -const CacheShard* LRUCache::GetShard(int shard) const { +const CacheShard* LRUCache::GetShard(uint32_t shard) const { return reinterpret_cast(&shards_[shard]); } diff --git a/cache/lru_cache.h b/cache/lru_cache.h index cee3f148b4..04b54738b1 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -8,10 +8,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include #include "cache/sharded_cache.h" - #include "port/malloc.h" #include "port/port.h" #include "util/autovector.h" @@ -153,7 +153,10 @@ struct LRUHandle { // 4.4.3's builtin hashtable. class LRUHandleTable { public: - LRUHandleTable(); + // If the table uses more hash bits than `max_upper_hash_bits`, + // it will eat into the bits used for sharding, which are constant + // for a given LRUHandleTable. + explicit LRUHandleTable(int max_upper_hash_bits); ~LRUHandleTable(); LRUHandle* Lookup(const Slice& key, uint32_t hash); @@ -161,8 +164,8 @@ class LRUHandleTable { LRUHandle* Remove(const Slice& key, uint32_t hash); template - void ApplyToAllCacheEntries(T func) { - for (uint32_t i = 0; i < length_; i++) { + void ApplyToEntriesRange(T func, uint32_t index_begin, uint32_t index_end) { + for (uint32_t i = index_begin; i < index_end; i++) { LRUHandle* h = list_[i]; while (h != nullptr) { auto n = h->next_hash; @@ -173,6 +176,8 @@ class LRUHandleTable { } } + int GetLengthBits() const { return length_bits_; } + private: // Return a pointer to slot that points to a cache entry that // matches key/hash. If there is no such cache entry, return a @@ -181,11 +186,19 @@ class LRUHandleTable { void Resize(); + // Number of hash bits (upper because lower bits used for sharding) + // used for table index. Length == 1 << length_bits_ + int length_bits_; + // The table consists of an array of buckets where each bucket is // a linked list of cache entries that hash into the bucket. - LRUHandle** list_; - uint32_t length_; + std::unique_ptr list_; + + // Number of elements currently in the table uint32_t elems_; + + // Set from max_upper_hash_bits (see constructor) + const int max_length_bits_; }; // A single shard of sharded cache. @@ -193,7 +206,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { public: LRUCacheShard(size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy); + CacheMetadataChargePolicy metadata_charge_policy, + int max_upper_hash_bits); virtual ~LRUCacheShard() override = default; // Separate from constructor so caller can easily make an array of LRUCache @@ -226,8 +240,10 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { virtual size_t GetUsage() const override; virtual size_t GetPinnedUsage() const override; - virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), - bool thread_safe) override; + virtual void ApplyToSomeEntries( + const std::function& callback, + uint32_t average_entries_per_lock, uint32_t* state) override; virtual void EraseUnRefEntries() override; @@ -319,8 +335,8 @@ class LRUCache kDontChargeCacheMetadata); virtual ~LRUCache(); virtual const char* Name() const override { return "LRUCache"; } - virtual CacheShard* GetShard(int shard) override; - virtual const CacheShard* GetShard(int shard) const override; + virtual CacheShard* GetShard(uint32_t shard) override; + virtual const CacheShard* GetShard(uint32_t shard) const override; virtual void* Value(Handle* handle) override; virtual size_t GetCharge(Handle* handle) const override; virtual uint32_t GetHash(Handle* handle) const override; diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index b30227e7b9..d9c0d064d6 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -30,9 +30,10 @@ class LRUCacheTest : public testing::Test { DeleteCache(); cache_ = reinterpret_cast( port::cacheline_aligned_alloc(sizeof(LRUCacheShard))); - new (cache_) LRUCacheShard(capacity, false /*strict_capacity_limit*/, - high_pri_pool_ratio, use_adaptive_mutex, - kDontChargeCacheMetadata); + new (cache_) + LRUCacheShard(capacity, false /*strict_capacity_limit*/, + high_pri_pool_ratio, use_adaptive_mutex, + kDontChargeCacheMetadata, 24 /*max_upper_hash_bits*/); } void Insert(const std::string& key, diff --git a/cache/sharded_cache.cc b/cache/sharded_cache.cc index 6c915df8cc..8d909ce49e 100644 --- a/cache/sharded_cache.cc +++ b/cache/sharded_cache.cc @@ -9,35 +9,47 @@ #include "cache/sharded_cache.h" -#include +#include +#include +#include +#include "util/hash.h" +#include "util/math.h" #include "util/mutexlock.h" namespace ROCKSDB_NAMESPACE { +namespace { + +inline uint32_t HashSlice(const Slice& s) { + return Lower32of64(GetSliceNPHash64(s)); +} + +} // namespace + ShardedCache::ShardedCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, std::shared_ptr allocator) : Cache(std::move(allocator)), - num_shard_bits_(num_shard_bits), + shard_mask_((uint32_t{1} << num_shard_bits) - 1), capacity_(capacity), strict_capacity_limit_(strict_capacity_limit), last_id_(1) {} void ShardedCache::SetCapacity(size_t capacity) { - int num_shards = 1 << num_shard_bits_; + uint32_t num_shards = GetNumShards(); const size_t per_shard = (capacity + (num_shards - 1)) / num_shards; MutexLock l(&capacity_mutex_); - for (int s = 0; s < num_shards; s++) { + for (uint32_t s = 0; s < num_shards; s++) { GetShard(s)->SetCapacity(per_shard); } capacity_ = capacity; } void ShardedCache::SetStrictCapacityLimit(bool strict_capacity_limit) { - int num_shards = 1 << num_shard_bits_; + uint32_t num_shards = GetNumShards(); MutexLock l(&capacity_mutex_); - for (int s = 0; s < num_shards; s++) { + for (uint32_t s = 0; s < num_shards; s++) { GetShard(s)->SetStrictCapacityLimit(strict_capacity_limit); } strict_capacity_limit_ = strict_capacity_limit; @@ -87,9 +99,9 @@ bool ShardedCache::HasStrictCapacityLimit() const { size_t ShardedCache::GetUsage() const { // We will not lock the cache when getting the usage from shards. - int num_shards = 1 << num_shard_bits_; + uint32_t num_shards = GetNumShards(); size_t usage = 0; - for (int s = 0; s < num_shards; s++) { + for (uint32_t s = 0; s < num_shards; s++) { usage += GetShard(s)->GetUsage(); } return usage; @@ -101,25 +113,42 @@ size_t ShardedCache::GetUsage(Handle* handle) const { size_t ShardedCache::GetPinnedUsage() const { // We will not lock the cache when getting the usage from shards. - int num_shards = 1 << num_shard_bits_; + uint32_t num_shards = GetNumShards(); size_t usage = 0; - for (int s = 0; s < num_shards; s++) { + for (uint32_t s = 0; s < num_shards; s++) { usage += GetShard(s)->GetPinnedUsage(); } return usage; } -void ShardedCache::ApplyToAllCacheEntries(void (*callback)(void*, size_t), - bool thread_safe) { - int num_shards = 1 << num_shard_bits_; - for (int s = 0; s < num_shards; s++) { - GetShard(s)->ApplyToAllCacheEntries(callback, thread_safe); - } +void ShardedCache::ApplyToAllEntries( + const std::function& callback, + const ApplyToAllEntriesOptions& opts) { + uint32_t num_shards = GetNumShards(); + // Iterate over part of each shard, rotating between shards, to + // minimize impact on latency of concurrent operations. + std::unique_ptr states(new uint32_t[num_shards]{}); + + uint32_t aepl_in_32 = static_cast( + std::min(size_t{UINT32_MAX}, opts.average_entries_per_lock)); + aepl_in_32 = std::min(aepl_in_32, uint32_t{1}); + + bool remaining_work; + do { + remaining_work = false; + for (uint32_t s = 0; s < num_shards; s++) { + if (states[s] != UINT32_MAX) { + GetShard(s)->ApplyToSomeEntries(callback, aepl_in_32, &states[s]); + remaining_work |= states[s] != UINT32_MAX; + } + } + } while (remaining_work); } void ShardedCache::EraseUnRefEntries() { - int num_shards = 1 << num_shard_bits_; - for (int s = 0; s < num_shards; s++) { + uint32_t num_shards = GetNumShards(); + for (uint32_t s = 0; s < num_shards; s++) { GetShard(s)->EraseUnRefEntries(); } } @@ -134,7 +163,8 @@ std::string ShardedCache::GetPrintableOptions() const { snprintf(buffer, kBufferSize, " capacity : %" ROCKSDB_PRIszt "\n", capacity_); ret.append(buffer); - snprintf(buffer, kBufferSize, " num_shard_bits : %d\n", num_shard_bits_); + snprintf(buffer, kBufferSize, " num_shard_bits : %d\n", + GetNumShardBits()); ret.append(buffer); snprintf(buffer, kBufferSize, " strict_capacity_limit : %d\n", strict_capacity_limit_); @@ -159,4 +189,8 @@ int GetDefaultCacheShardBits(size_t capacity) { return num_shard_bits; } +int ShardedCache::GetNumShardBits() const { return BitsSetToOne(shard_mask_); } + +uint32_t ShardedCache::GetNumShards() const { return shard_mask_ + 1; } + } // namespace ROCKSDB_NAMESPACE diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index ce9e459dc1..54c9caf5be 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -14,7 +14,6 @@ #include "port/port.h" #include "rocksdb/cache.h" -#include "util/hash.h" namespace ROCKSDB_NAMESPACE { @@ -24,9 +23,9 @@ class CacheShard { CacheShard() = default; virtual ~CacheShard() = default; + using DeleterFn = Cache::DeleterFn; virtual Status Insert(const Slice& key, uint32_t hash, void* value, - size_t charge, - void (*deleter)(const Slice& key, void* value), + size_t charge, DeleterFn deleter, Cache::Handle** handle, Cache::Priority priority) = 0; virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0; virtual bool Ref(Cache::Handle* handle) = 0; @@ -36,8 +35,14 @@ class CacheShard { virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; virtual size_t GetUsage() const = 0; virtual size_t GetPinnedUsage() const = 0; - virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), - bool thread_safe) = 0; + // Handles iterating over roughly `average_entries_per_lock` entries, using + // `state` to somehow record where it last ended up. Caller initially uses + // *state == 0 and implementation sets *state = UINT32_MAX to indicate + // completion. + virtual void ApplyToSomeEntries( + const std::function& callback, + uint32_t average_entries_per_lock, uint32_t* state) = 0; virtual void EraseUnRefEntries() = 0; virtual std::string GetPrintableOptions() const { return ""; } void set_metadata_charge_policy( @@ -58,8 +63,8 @@ class ShardedCache : public Cache { std::shared_ptr memory_allocator = nullptr); virtual ~ShardedCache() = default; virtual const char* Name() const override = 0; - virtual CacheShard* GetShard(int shard) = 0; - virtual const CacheShard* GetShard(int shard) const = 0; + virtual CacheShard* GetShard(uint32_t shard) = 0; + virtual const CacheShard* GetShard(uint32_t shard) const = 0; virtual void* Value(Handle* handle) override = 0; virtual size_t GetCharge(Handle* handle) const override = 0; @@ -70,8 +75,8 @@ class ShardedCache : public Cache { virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override; virtual Status Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value), - Handle** handle, Priority priority) override; + DeleterFn deleter, Handle** handle, + Priority priority) override; virtual Handle* Lookup(const Slice& key, Statistics* stats) override; virtual bool Ref(Handle* handle) override; virtual bool Release(Handle* handle, bool force_erase = false) override; @@ -82,24 +87,20 @@ class ShardedCache : public Cache { virtual size_t GetUsage() const override; virtual size_t GetUsage(Handle* handle) const override; virtual size_t GetPinnedUsage() const override; - virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), - bool thread_safe) override; + virtual void ApplyToAllEntries( + const std::function& callback, + const ApplyToAllEntriesOptions& opts) override; virtual void EraseUnRefEntries() override; virtual std::string GetPrintableOptions() const override; - int GetNumShardBits() const { return num_shard_bits_; } + int GetNumShardBits() const; + uint32_t GetNumShards() const; private: - static inline uint32_t HashSlice(const Slice& s) { - return static_cast(GetSliceNPHash64(s)); - } + inline uint32_t Shard(uint32_t hash) { return hash & shard_mask_; } - uint32_t Shard(uint32_t hash) { - // Note, hash >> 32 yields hash in gcc, not the zero we expect! - return (num_shard_bits_ > 0) ? (hash >> (32 - num_shard_bits_)) : 0; - } - - int num_shard_bits_; + const uint32_t shard_mask_; mutable port::Mutex capacity_mutex_; size_t capacity_; bool strict_capacity_limit_; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 697fb6ffba..baa0af8ba5 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -503,6 +503,10 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) { options.new_table_reader_for_compaction_inputs = true; options.max_open_files = 20; options.level0_file_num_compaction_trigger = 3; + // Avoid many shards with small max_open_files, where as little as + // two table insertions could lead to an LRU eviction, depending on + // hash values. + options.table_cache_numshardbits = 2; DestroyAndReopen(options); Random rnd(301); diff --git a/db/db_test_util.h b/db/db_test_util.h index 0c294f786d..a11457d9d4 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -877,6 +877,13 @@ class CacheWrapper : public Cache { target_->ApplyToAllCacheEntries(callback, thread_safe); } + void ApplyToAllEntries( + const std::function& callback, + const ApplyToAllEntriesOptions& opts) override { + target_->ApplyToAllEntries(callback, opts); + } + void EraseUnRefEntries() override { target_->EraseUnRefEntries(); } protected: diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 008b84b95a..21e9d2a5d6 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -22,9 +22,11 @@ #pragma once -#include +#include +#include #include #include + #include "rocksdb/memory_allocator.h" #include "rocksdb/slice.h" #include "rocksdb/statistics.h" @@ -171,6 +173,11 @@ class Cache { // Opaque handle to an entry stored in the cache. struct Handle {}; + // A function pointer type for custom destruction of an entry's + // value. The Cache is responsible for copying and reclaiming space + // for the key, but values are managed by the caller. + using DeleterFn = void (*)(const Slice& key, void* value); + // The type of the Cache virtual const char* Name() const = 0; @@ -188,10 +195,11 @@ class Cache { // insert. In case of error value will be cleanup. // // When the inserted entry is no longer needed, the key and - // value will be passed to "deleter". + // value will be passed to "deleter" which must delete the value. + // (The Cache is responsible for copying and reclaiming space for + // the key.) virtual Status Insert(const Slice& key, void* value, size_t charge, - void (*deleter)(const Slice& key, void* value), - Handle** handle = nullptr, + DeleterFn deleter, Handle** handle = nullptr, Priority priority = Priority::LOW) = 0; // If the cache has no mapping for "key", returns nullptr. @@ -277,11 +285,33 @@ class Cache { // default implementation is noop } - // Apply callback to all entries in the cache - // If thread_safe is true, it will also lock the accesses. Otherwise, it will - // access the cache without the lock held - virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t), - bool thread_safe) = 0; + struct ApplyToAllEntriesOptions { + // If the Cache uses locks, setting `average_entries_per_lock` to + // a higher value suggests iterating over more entries each time a lock + // is acquired, likely reducing the time for ApplyToAllEntries but + // increasing latency for concurrent users of the Cache. Setting + // `average_entries_per_lock` to a smaller value could be helpful if + // callback is relatively expensive, such as using large data structures. + size_t average_entries_per_lock = 256; + }; + + // Apply a callback to all entries in the cache. The Cache must ensure + // thread safety but does not guarantee that a consistent snapshot of all + // entries is iterated over if other threads are operating on the Cache + // also. + virtual void ApplyToAllEntries( + const std::function& callback, + const ApplyToAllEntriesOptions& opts) = 0; + + // DEPRECATED version of above. (Default implementation uses above.) + virtual void ApplyToAllCacheEntries(void (*callback)(void* value, + size_t charge), + bool /*thread_safe*/) { + ApplyToAllEntries([callback](const Slice&, void* value, size_t charge, + DeleterFn) { callback(value, charge); }, + {}); + } // Remove all entries. // Prerequisite: no entry is referenced. diff --git a/utilities/simulator_cache/sim_cache.cc b/utilities/simulator_cache/sim_cache.cc index 0da71ab166..b0d619eb30 100644 --- a/utilities/simulator_cache/sim_cache.cc +++ b/utilities/simulator_cache/sim_cache.cc @@ -255,6 +255,13 @@ class SimCacheImpl : public SimCache { cache_->ApplyToAllCacheEntries(callback, thread_safe); } + void ApplyToAllEntries( + const std::function& callback, + const ApplyToAllEntriesOptions& opts) override { + cache_->ApplyToAllEntries(callback, opts); + } + void EraseUnRefEntries() override { cache_->EraseUnRefEntries(); key_only_cache_->EraseUnRefEntries();