mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-29 18:33:58 +00:00
1aac814578
Summary: folly DistributedMutex is faster than standard mutexes though imposes some static obligations on usage. See https://github.com/facebook/folly/blob/main/folly/synchronization/DistributedMutex.h for details. Here we use this alternative for our Cache implementations (especially LRUCache) for better locking performance, when RocksDB is compiled with folly. Also added information about which distributed mutex implementation is being used to cache_bench output and to DB LOG. Intended follow-up: * Use DMutex in more places, perhaps improving API to support non-scoped locking * Fix linking with fbcode compiler (needs ROCKSDB_NO_FBCODE=1 currently) Credit: Thanks Siying for reminding me about this line of work that was previously left unfinished. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10179 Test Plan: for correctness, existing tests. CircleCI config updated. Also Meta-internal buck build updated. For performance, ran simultaneous before & after cache_bench. Out of three comparison runs, the middle improvement to ops/sec was +21%: Baseline: USE_CLANG=1 DEBUG_LEVEL=0 make -j24 cache_bench (fbcode compiler) ``` Complete in 20.201 s; Rough parallel ops/sec = 1584062 Thread ops/sec = 107176 Operation latency (ns): Count: 32000000 Average: 9257.9421 StdDev: 122412.04 Min: 134 Median: 3623.0493 Max: 56918500 Percentiles: P50: 3623.05 P75: 10288.02 P99: 30219.35 P99.9: 683522.04 P99.99: 7302791.63 ``` New: (add USE_FOLLY=1) ``` Complete in 16.674 s; Rough parallel ops/sec = 1919135 (+21%) Thread ops/sec = 135487 Operation latency (ns): Count: 32000000 Average: 7304.9294 StdDev: 108530.28 Min: 132 Median: 3777.6012 Max: 91030902 Percentiles: P50: 3777.60 P75: 10169.89 P99: 24504.51 P99.9: 59721.59 P99.99: 1861151.83 ``` Reviewed By: anand1976 Differential Revision: D37182983 Pulled By: pdillinger fbshipit-source-id: a17eb05f25b832b6a2c1356f5c657e831a5af8d1
802 lines
26 KiB
C++
802 lines
26 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "cache/lru_cache.h"
|
|
|
|
#include <cassert>
|
|
#include <cstdint>
|
|
#include <cstdio>
|
|
|
|
#include "monitoring/perf_context_imp.h"
|
|
#include "monitoring/statistics.h"
|
|
#include "port/lang.h"
|
|
#include "util/distributed_mutex.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
namespace lru_cache {
|
|
|
|
LRUHandleTable::LRUHandleTable(int max_upper_hash_bits)
|
|
: length_bits_(/* historical starting size*/ 4),
|
|
list_(new LRUHandle* [size_t{1} << length_bits_] {}),
|
|
elems_(0),
|
|
max_length_bits_(max_upper_hash_bits) {}
|
|
|
|
LRUHandleTable::~LRUHandleTable() {
|
|
ApplyToEntriesRange(
|
|
[](LRUHandle* h) {
|
|
if (!h->HasRefs()) {
|
|
h->Free();
|
|
}
|
|
},
|
|
0, uint32_t{1} << length_bits_);
|
|
}
|
|
|
|
LRUHandle* LRUHandleTable::Lookup(const Slice& key, uint32_t hash) {
|
|
return *FindPointer(key, hash);
|
|
}
|
|
|
|
LRUHandle* LRUHandleTable::Insert(LRUHandle* h) {
|
|
LRUHandle** ptr = FindPointer(h->key(), h->hash);
|
|
LRUHandle* old = *ptr;
|
|
h->next_hash = (old == nullptr ? nullptr : old->next_hash);
|
|
*ptr = h;
|
|
if (old == nullptr) {
|
|
++elems_;
|
|
if ((elems_ >> length_bits_) > 0) { // elems_ >= length
|
|
// Since each cache entry is fairly large, we aim for a small
|
|
// average linked list length (<= 1).
|
|
Resize();
|
|
}
|
|
}
|
|
return old;
|
|
}
|
|
|
|
LRUHandle* LRUHandleTable::Remove(const Slice& key, uint32_t hash) {
|
|
LRUHandle** ptr = FindPointer(key, hash);
|
|
LRUHandle* result = *ptr;
|
|
if (result != nullptr) {
|
|
*ptr = result->next_hash;
|
|
--elems_;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
LRUHandle** LRUHandleTable::FindPointer(const Slice& key, uint32_t hash) {
|
|
LRUHandle** ptr = &list_[hash >> (32 - length_bits_)];
|
|
while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) {
|
|
ptr = &(*ptr)->next_hash;
|
|
}
|
|
return ptr;
|
|
}
|
|
|
|
void LRUHandleTable::Resize() {
|
|
if (length_bits_ >= max_length_bits_) {
|
|
// Due to reaching limit of hash information, if we made the table bigger,
|
|
// we would allocate more addresses but only the same number would be used.
|
|
return;
|
|
}
|
|
if (length_bits_ >= 31) {
|
|
// Avoid undefined behavior shifting uint32_t by 32.
|
|
return;
|
|
}
|
|
|
|
uint32_t old_length = uint32_t{1} << length_bits_;
|
|
int new_length_bits = length_bits_ + 1;
|
|
std::unique_ptr<LRUHandle* []> new_list {
|
|
new LRUHandle* [size_t{1} << new_length_bits] {}
|
|
};
|
|
uint32_t count = 0;
|
|
for (uint32_t i = 0; i < old_length; i++) {
|
|
LRUHandle* h = list_[i];
|
|
while (h != nullptr) {
|
|
LRUHandle* next = h->next_hash;
|
|
uint32_t hash = h->hash;
|
|
LRUHandle** ptr = &new_list[hash >> (32 - new_length_bits)];
|
|
h->next_hash = *ptr;
|
|
*ptr = h;
|
|
h = next;
|
|
count++;
|
|
}
|
|
}
|
|
assert(elems_ == count);
|
|
list_ = std::move(new_list);
|
|
length_bits_ = new_length_bits;
|
|
}
|
|
|
|
LRUCacheShard::LRUCacheShard(
|
|
size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio,
|
|
bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy,
|
|
int max_upper_hash_bits,
|
|
const std::shared_ptr<SecondaryCache>& secondary_cache)
|
|
: capacity_(0),
|
|
high_pri_pool_usage_(0),
|
|
strict_capacity_limit_(strict_capacity_limit),
|
|
high_pri_pool_ratio_(high_pri_pool_ratio),
|
|
high_pri_pool_capacity_(0),
|
|
table_(max_upper_hash_bits),
|
|
usage_(0),
|
|
lru_usage_(0),
|
|
mutex_(use_adaptive_mutex),
|
|
secondary_cache_(secondary_cache) {
|
|
set_metadata_charge_policy(metadata_charge_policy);
|
|
// Make empty circular linked list.
|
|
lru_.next = &lru_;
|
|
lru_.prev = &lru_;
|
|
lru_low_pri_ = &lru_;
|
|
SetCapacity(capacity);
|
|
}
|
|
|
|
void LRUCacheShard::EraseUnRefEntries() {
|
|
autovector<LRUHandle*> last_reference_list;
|
|
{
|
|
DMutexLock l(mutex_);
|
|
while (lru_.next != &lru_) {
|
|
LRUHandle* old = lru_.next;
|
|
// LRU list contains only elements which can be evicted.
|
|
assert(old->InCache() && !old->HasRefs());
|
|
LRU_Remove(old);
|
|
table_.Remove(old->key(), old->hash);
|
|
old->SetInCache(false);
|
|
assert(usage_ >= old->total_charge);
|
|
usage_ -= old->total_charge;
|
|
last_reference_list.push_back(old);
|
|
}
|
|
}
|
|
|
|
for (auto entry : last_reference_list) {
|
|
entry->Free();
|
|
}
|
|
}
|
|
|
|
void LRUCacheShard::ApplyToSomeEntries(
|
|
const std::function<void(const Slice& key, void* value, size_t charge,
|
|
DeleterFn deleter)>& callback,
|
|
uint32_t average_entries_per_lock, uint32_t* state) {
|
|
// The state is essentially going to be the starting hash, which works
|
|
// nicely even if we resize between calls because we use upper-most
|
|
// hash bits for table indexes.
|
|
DMutexLock l(mutex_);
|
|
uint32_t length_bits = table_.GetLengthBits();
|
|
uint32_t length = uint32_t{1} << length_bits;
|
|
|
|
assert(average_entries_per_lock > 0);
|
|
// Assuming we are called with same average_entries_per_lock repeatedly,
|
|
// this simplifies some logic (index_end will not overflow).
|
|
assert(average_entries_per_lock < length || *state == 0);
|
|
|
|
uint32_t index_begin = *state >> (32 - length_bits);
|
|
uint32_t index_end = index_begin + average_entries_per_lock;
|
|
if (index_end >= length) {
|
|
// Going to end
|
|
index_end = length;
|
|
*state = UINT32_MAX;
|
|
} else {
|
|
*state = index_end << (32 - length_bits);
|
|
}
|
|
|
|
table_.ApplyToEntriesRange(
|
|
[callback,
|
|
metadata_charge_policy = metadata_charge_policy_](LRUHandle* h) {
|
|
DeleterFn deleter = h->IsSecondaryCacheCompatible()
|
|
? h->info_.helper->del_cb
|
|
: h->info_.deleter;
|
|
callback(h->key(), h->value, h->GetCharge(metadata_charge_policy),
|
|
deleter);
|
|
},
|
|
index_begin, index_end);
|
|
}
|
|
|
|
void LRUCacheShard::TEST_GetLRUList(LRUHandle** lru, LRUHandle** lru_low_pri) {
|
|
DMutexLock l(mutex_);
|
|
*lru = &lru_;
|
|
*lru_low_pri = lru_low_pri_;
|
|
}
|
|
|
|
size_t LRUCacheShard::TEST_GetLRUSize() {
|
|
DMutexLock l(mutex_);
|
|
LRUHandle* lru_handle = lru_.next;
|
|
size_t lru_size = 0;
|
|
while (lru_handle != &lru_) {
|
|
lru_size++;
|
|
lru_handle = lru_handle->next;
|
|
}
|
|
return lru_size;
|
|
}
|
|
|
|
double LRUCacheShard::GetHighPriPoolRatio() {
|
|
DMutexLock l(mutex_);
|
|
return high_pri_pool_ratio_;
|
|
}
|
|
|
|
void LRUCacheShard::LRU_Remove(LRUHandle* e) {
|
|
assert(e->next != nullptr);
|
|
assert(e->prev != nullptr);
|
|
if (lru_low_pri_ == e) {
|
|
lru_low_pri_ = e->prev;
|
|
}
|
|
e->next->prev = e->prev;
|
|
e->prev->next = e->next;
|
|
e->prev = e->next = nullptr;
|
|
assert(lru_usage_ >= e->total_charge);
|
|
lru_usage_ -= e->total_charge;
|
|
if (e->InHighPriPool()) {
|
|
assert(high_pri_pool_usage_ >= e->total_charge);
|
|
high_pri_pool_usage_ -= e->total_charge;
|
|
}
|
|
}
|
|
|
|
void LRUCacheShard::LRU_Insert(LRUHandle* e) {
|
|
assert(e->next == nullptr);
|
|
assert(e->prev == nullptr);
|
|
if (high_pri_pool_ratio_ > 0 && (e->IsHighPri() || e->HasHit())) {
|
|
// Inset "e" to head of LRU list.
|
|
e->next = &lru_;
|
|
e->prev = lru_.prev;
|
|
e->prev->next = e;
|
|
e->next->prev = e;
|
|
e->SetInHighPriPool(true);
|
|
high_pri_pool_usage_ += e->total_charge;
|
|
MaintainPoolSize();
|
|
} else {
|
|
// Insert "e" to the head of low-pri pool. Note that when
|
|
// high_pri_pool_ratio is 0, head of low-pri pool is also head of LRU list.
|
|
e->next = lru_low_pri_->next;
|
|
e->prev = lru_low_pri_;
|
|
e->prev->next = e;
|
|
e->next->prev = e;
|
|
e->SetInHighPriPool(false);
|
|
lru_low_pri_ = e;
|
|
}
|
|
lru_usage_ += e->total_charge;
|
|
}
|
|
|
|
void LRUCacheShard::MaintainPoolSize() {
|
|
while (high_pri_pool_usage_ > high_pri_pool_capacity_) {
|
|
// Overflow last entry in high-pri pool to low-pri pool.
|
|
lru_low_pri_ = lru_low_pri_->next;
|
|
assert(lru_low_pri_ != &lru_);
|
|
lru_low_pri_->SetInHighPriPool(false);
|
|
assert(high_pri_pool_usage_ >= lru_low_pri_->total_charge);
|
|
high_pri_pool_usage_ -= lru_low_pri_->total_charge;
|
|
}
|
|
}
|
|
|
|
void LRUCacheShard::EvictFromLRU(size_t charge,
|
|
autovector<LRUHandle*>* deleted) {
|
|
while ((usage_ + charge) > capacity_ && lru_.next != &lru_) {
|
|
LRUHandle* old = lru_.next;
|
|
// LRU list contains only elements which can be evicted.
|
|
assert(old->InCache() && !old->HasRefs());
|
|
LRU_Remove(old);
|
|
table_.Remove(old->key(), old->hash);
|
|
old->SetInCache(false);
|
|
assert(usage_ >= old->total_charge);
|
|
usage_ -= old->total_charge;
|
|
deleted->push_back(old);
|
|
}
|
|
}
|
|
|
|
void LRUCacheShard::SetCapacity(size_t capacity) {
|
|
autovector<LRUHandle*> last_reference_list;
|
|
{
|
|
DMutexLock l(mutex_);
|
|
capacity_ = capacity;
|
|
high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_;
|
|
EvictFromLRU(0, &last_reference_list);
|
|
}
|
|
|
|
// Try to insert the evicted entries into tiered cache.
|
|
// Free the entries outside of mutex for performance reasons.
|
|
for (auto entry : last_reference_list) {
|
|
if (secondary_cache_ && entry->IsSecondaryCacheCompatible() &&
|
|
!entry->IsInSecondaryCache()) {
|
|
secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper)
|
|
.PermitUncheckedError();
|
|
}
|
|
entry->Free();
|
|
}
|
|
}
|
|
|
|
void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) {
|
|
DMutexLock l(mutex_);
|
|
strict_capacity_limit_ = strict_capacity_limit;
|
|
}
|
|
|
|
Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle,
|
|
bool free_handle_on_fail) {
|
|
Status s = Status::OK();
|
|
autovector<LRUHandle*> last_reference_list;
|
|
|
|
{
|
|
DMutexLock l(mutex_);
|
|
|
|
// Free the space following strict LRU policy until enough space
|
|
// is freed or the lru list is empty.
|
|
EvictFromLRU(e->total_charge, &last_reference_list);
|
|
|
|
if ((usage_ + e->total_charge) > capacity_ &&
|
|
(strict_capacity_limit_ || handle == nullptr)) {
|
|
e->SetInCache(false);
|
|
if (handle == nullptr) {
|
|
// Don't insert the entry but still return ok, as if the entry inserted
|
|
// into cache and get evicted immediately.
|
|
last_reference_list.push_back(e);
|
|
} else {
|
|
if (free_handle_on_fail) {
|
|
delete[] reinterpret_cast<char*>(e);
|
|
*handle = nullptr;
|
|
}
|
|
s = Status::Incomplete("Insert failed due to LRU cache being full.");
|
|
}
|
|
} else {
|
|
// Insert into the cache. Note that the cache might get larger than its
|
|
// capacity if not enough space was freed up.
|
|
LRUHandle* old = table_.Insert(e);
|
|
usage_ += e->total_charge;
|
|
if (old != nullptr) {
|
|
s = Status::OkOverwritten();
|
|
assert(old->InCache());
|
|
old->SetInCache(false);
|
|
if (!old->HasRefs()) {
|
|
// old is on LRU because it's in cache and its reference count is 0.
|
|
LRU_Remove(old);
|
|
assert(usage_ >= old->total_charge);
|
|
usage_ -= old->total_charge;
|
|
last_reference_list.push_back(old);
|
|
}
|
|
}
|
|
if (handle == nullptr) {
|
|
LRU_Insert(e);
|
|
} else {
|
|
// If caller already holds a ref, no need to take one here.
|
|
if (!e->HasRefs()) {
|
|
e->Ref();
|
|
}
|
|
*handle = reinterpret_cast<Cache::Handle*>(e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Try to insert the evicted entries into the secondary cache.
|
|
// Free the entries here outside of mutex for performance reasons.
|
|
for (auto entry : last_reference_list) {
|
|
if (secondary_cache_ && entry->IsSecondaryCacheCompatible() &&
|
|
!entry->IsInSecondaryCache()) {
|
|
secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper)
|
|
.PermitUncheckedError();
|
|
}
|
|
entry->Free();
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
void LRUCacheShard::Promote(LRUHandle* e) {
|
|
SecondaryCacheResultHandle* secondary_handle = e->sec_handle;
|
|
|
|
assert(secondary_handle->IsReady());
|
|
e->SetIncomplete(false);
|
|
e->SetInCache(true);
|
|
e->value = secondary_handle->Value();
|
|
e->CalcTotalCharge(secondary_handle->Size(), metadata_charge_policy_);
|
|
delete secondary_handle;
|
|
|
|
// This call could fail if the cache is over capacity and
|
|
// strict_capacity_limit_ is true. In such a case, we don't want
|
|
// InsertItem() to free the handle, since the item is already in memory
|
|
// and the caller will most likely just read from disk if we erase it here.
|
|
if (e->value) {
|
|
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(e);
|
|
Status s = InsertItem(e, &handle, /*free_handle_on_fail=*/false);
|
|
if (!s.ok()) {
|
|
// Item is in memory, but not accounted against the cache capacity.
|
|
// When the handle is released, the item should get deleted.
|
|
assert(!e->InCache());
|
|
}
|
|
} else {
|
|
// Since the secondary cache lookup failed, mark the item as not in cache
|
|
// Don't charge the cache as its only metadata that'll shortly be released
|
|
DMutexLock l(mutex_);
|
|
// TODO
|
|
e->CalcTotalCharge(0, metadata_charge_policy_);
|
|
e->SetInCache(false);
|
|
}
|
|
}
|
|
|
|
Cache::Handle* LRUCacheShard::Lookup(
|
|
const Slice& key, uint32_t hash,
|
|
const ShardedCache::CacheItemHelper* helper,
|
|
const ShardedCache::CreateCallback& create_cb, Cache::Priority priority,
|
|
bool wait, Statistics* stats) {
|
|
LRUHandle* e = nullptr;
|
|
{
|
|
DMutexLock l(mutex_);
|
|
e = table_.Lookup(key, hash);
|
|
if (e != nullptr) {
|
|
assert(e->InCache());
|
|
if (!e->HasRefs()) {
|
|
// The entry is in LRU since it's in hash and has no external references
|
|
LRU_Remove(e);
|
|
}
|
|
e->Ref();
|
|
e->SetHit();
|
|
}
|
|
}
|
|
|
|
// If handle table lookup failed, then allocate a handle outside the
|
|
// mutex if we're going to lookup in the secondary cache.
|
|
// Only support synchronous for now.
|
|
// TODO: Support asynchronous lookup in secondary cache
|
|
if (!e && secondary_cache_ && helper && helper->saveto_cb) {
|
|
// For objects from the secondary cache, we expect the caller to provide
|
|
// a way to create/delete the primary cache object. The only case where
|
|
// a deleter would not be required is for dummy entries inserted for
|
|
// accounting purposes, which we won't demote to the secondary cache
|
|
// anyway.
|
|
assert(create_cb && helper->del_cb);
|
|
bool is_in_sec_cache{false};
|
|
std::unique_ptr<SecondaryCacheResultHandle> secondary_handle =
|
|
secondary_cache_->Lookup(key, create_cb, wait, is_in_sec_cache);
|
|
if (secondary_handle != nullptr) {
|
|
e = reinterpret_cast<LRUHandle*>(
|
|
new char[sizeof(LRUHandle) - 1 + key.size()]);
|
|
|
|
e->flags = 0;
|
|
e->SetSecondaryCacheCompatible(true);
|
|
e->info_.helper = helper;
|
|
e->key_length = key.size();
|
|
e->hash = hash;
|
|
e->refs = 0;
|
|
e->next = e->prev = nullptr;
|
|
e->SetPriority(priority);
|
|
memcpy(e->key_data, key.data(), key.size());
|
|
e->value = nullptr;
|
|
e->sec_handle = secondary_handle.release();
|
|
e->Ref();
|
|
|
|
if (wait) {
|
|
Promote(e);
|
|
e->SetIsInSecondaryCache(is_in_sec_cache);
|
|
if (!e->value) {
|
|
// The secondary cache returned a handle, but the lookup failed.
|
|
e->Unref();
|
|
e->Free();
|
|
e = nullptr;
|
|
} else {
|
|
PERF_COUNTER_ADD(secondary_cache_hit_count, 1);
|
|
RecordTick(stats, SECONDARY_CACHE_HITS);
|
|
}
|
|
} else {
|
|
// If wait is false, we always return a handle and let the caller
|
|
// release the handle after checking for success or failure.
|
|
e->SetIncomplete(true);
|
|
e->SetIsInSecondaryCache(is_in_sec_cache);
|
|
// This may be slightly inaccurate, if the lookup eventually fails.
|
|
// But the probability is very low.
|
|
PERF_COUNTER_ADD(secondary_cache_hit_count, 1);
|
|
RecordTick(stats, SECONDARY_CACHE_HITS);
|
|
}
|
|
}
|
|
}
|
|
return reinterpret_cast<Cache::Handle*>(e);
|
|
}
|
|
|
|
bool LRUCacheShard::Ref(Cache::Handle* h) {
|
|
LRUHandle* e = reinterpret_cast<LRUHandle*>(h);
|
|
DMutexLock l(mutex_);
|
|
// To create another reference - entry must be already externally referenced.
|
|
assert(e->HasRefs());
|
|
e->Ref();
|
|
return true;
|
|
}
|
|
|
|
void LRUCacheShard::SetHighPriorityPoolRatio(double high_pri_pool_ratio) {
|
|
DMutexLock l(mutex_);
|
|
high_pri_pool_ratio_ = high_pri_pool_ratio;
|
|
high_pri_pool_capacity_ = capacity_ * high_pri_pool_ratio_;
|
|
MaintainPoolSize();
|
|
}
|
|
|
|
bool LRUCacheShard::Release(Cache::Handle* handle, bool erase_if_last_ref) {
|
|
if (handle == nullptr) {
|
|
return false;
|
|
}
|
|
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
|
|
bool last_reference = false;
|
|
{
|
|
DMutexLock l(mutex_);
|
|
last_reference = e->Unref();
|
|
if (last_reference && e->InCache()) {
|
|
// The item is still in cache, and nobody else holds a reference to it.
|
|
if (usage_ > capacity_ || erase_if_last_ref) {
|
|
// The LRU list must be empty since the cache is full.
|
|
assert(lru_.next == &lru_ || erase_if_last_ref);
|
|
// Take this opportunity and remove the item.
|
|
table_.Remove(e->key(), e->hash);
|
|
e->SetInCache(false);
|
|
} else {
|
|
// Put the item back on the LRU list, and don't free it.
|
|
LRU_Insert(e);
|
|
last_reference = false;
|
|
}
|
|
}
|
|
// If it was the last reference, and the entry is either not secondary
|
|
// cache compatible (i.e a dummy entry for accounting), or is secondary
|
|
// cache compatible and has a non-null value, then decrement the cache
|
|
// usage. If value is null in the latter case, taht means the lookup
|
|
// failed and we didn't charge the cache.
|
|
if (last_reference && (!e->IsSecondaryCacheCompatible() || e->value)) {
|
|
assert(usage_ >= e->total_charge);
|
|
usage_ -= e->total_charge;
|
|
}
|
|
}
|
|
|
|
// Free the entry here outside of mutex for performance reasons.
|
|
if (last_reference) {
|
|
e->Free();
|
|
}
|
|
return last_reference;
|
|
}
|
|
|
|
Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
|
|
size_t charge,
|
|
void (*deleter)(const Slice& key, void* value),
|
|
const Cache::CacheItemHelper* helper,
|
|
Cache::Handle** handle, Cache::Priority priority) {
|
|
// Allocate the memory here outside of the mutex.
|
|
// If the cache is full, we'll have to release it.
|
|
// It shouldn't happen very often though.
|
|
LRUHandle* e = reinterpret_cast<LRUHandle*>(
|
|
new char[sizeof(LRUHandle) - 1 + key.size()]);
|
|
|
|
e->value = value;
|
|
e->flags = 0;
|
|
if (helper) {
|
|
e->SetSecondaryCacheCompatible(true);
|
|
e->info_.helper = helper;
|
|
} else {
|
|
#ifdef __SANITIZE_THREAD__
|
|
e->is_secondary_cache_compatible_for_tsan = false;
|
|
#endif // __SANITIZE_THREAD__
|
|
e->info_.deleter = deleter;
|
|
}
|
|
e->key_length = key.size();
|
|
e->hash = hash;
|
|
e->refs = 0;
|
|
e->next = e->prev = nullptr;
|
|
e->SetInCache(true);
|
|
e->SetPriority(priority);
|
|
memcpy(e->key_data, key.data(), key.size());
|
|
e->CalcTotalCharge(charge, metadata_charge_policy_);
|
|
|
|
return InsertItem(e, handle, /* free_handle_on_fail */ true);
|
|
}
|
|
|
|
void LRUCacheShard::Erase(const Slice& key, uint32_t hash) {
|
|
LRUHandle* e;
|
|
bool last_reference = false;
|
|
{
|
|
DMutexLock l(mutex_);
|
|
e = table_.Remove(key, hash);
|
|
if (e != nullptr) {
|
|
assert(e->InCache());
|
|
e->SetInCache(false);
|
|
if (!e->HasRefs()) {
|
|
// The entry is in LRU since it's in hash and has no external references
|
|
LRU_Remove(e);
|
|
assert(usage_ >= e->total_charge);
|
|
usage_ -= e->total_charge;
|
|
last_reference = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Free the entry here outside of mutex for performance reasons.
|
|
// last_reference will only be true if e != nullptr.
|
|
if (last_reference) {
|
|
e->Free();
|
|
}
|
|
}
|
|
|
|
bool LRUCacheShard::IsReady(Cache::Handle* handle) {
|
|
LRUHandle* e = reinterpret_cast<LRUHandle*>(handle);
|
|
DMutexLock l(mutex_);
|
|
bool ready = true;
|
|
if (e->IsPending()) {
|
|
assert(secondary_cache_);
|
|
assert(e->sec_handle);
|
|
ready = e->sec_handle->IsReady();
|
|
}
|
|
return ready;
|
|
}
|
|
|
|
size_t LRUCacheShard::GetUsage() const {
|
|
DMutexLock l(mutex_);
|
|
return usage_;
|
|
}
|
|
|
|
size_t LRUCacheShard::GetPinnedUsage() const {
|
|
DMutexLock l(mutex_);
|
|
assert(usage_ >= lru_usage_);
|
|
return usage_ - lru_usage_;
|
|
}
|
|
|
|
std::string LRUCacheShard::GetPrintableOptions() const {
|
|
const int kBufferSize = 200;
|
|
char buffer[kBufferSize];
|
|
{
|
|
DMutexLock l(mutex_);
|
|
snprintf(buffer, kBufferSize, " high_pri_pool_ratio: %.3lf\n",
|
|
high_pri_pool_ratio_);
|
|
}
|
|
return std::string(buffer);
|
|
}
|
|
|
|
LRUCache::LRUCache(size_t capacity, int num_shard_bits,
|
|
bool strict_capacity_limit, double high_pri_pool_ratio,
|
|
std::shared_ptr<MemoryAllocator> allocator,
|
|
bool use_adaptive_mutex,
|
|
CacheMetadataChargePolicy metadata_charge_policy,
|
|
const std::shared_ptr<SecondaryCache>& secondary_cache)
|
|
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit,
|
|
std::move(allocator)) {
|
|
num_shards_ = 1 << num_shard_bits;
|
|
shards_ = reinterpret_cast<LRUCacheShard*>(
|
|
port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_));
|
|
size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_;
|
|
for (int i = 0; i < num_shards_; i++) {
|
|
new (&shards_[i]) LRUCacheShard(
|
|
per_shard, strict_capacity_limit, high_pri_pool_ratio,
|
|
use_adaptive_mutex, metadata_charge_policy,
|
|
/* max_upper_hash_bits */ 32 - num_shard_bits, secondary_cache);
|
|
}
|
|
secondary_cache_ = secondary_cache;
|
|
}
|
|
|
|
LRUCache::~LRUCache() {
|
|
if (shards_ != nullptr) {
|
|
assert(num_shards_ > 0);
|
|
for (int i = 0; i < num_shards_; i++) {
|
|
shards_[i].~LRUCacheShard();
|
|
}
|
|
port::cacheline_aligned_free(shards_);
|
|
}
|
|
}
|
|
|
|
CacheShard* LRUCache::GetShard(uint32_t shard) {
|
|
return reinterpret_cast<CacheShard*>(&shards_[shard]);
|
|
}
|
|
|
|
const CacheShard* LRUCache::GetShard(uint32_t shard) const {
|
|
return reinterpret_cast<CacheShard*>(&shards_[shard]);
|
|
}
|
|
|
|
void* LRUCache::Value(Handle* handle) {
|
|
return reinterpret_cast<const LRUHandle*>(handle)->value;
|
|
}
|
|
|
|
size_t LRUCache::GetCharge(Handle* handle) const {
|
|
CacheMetadataChargePolicy metadata_charge_policy = kDontChargeCacheMetadata;
|
|
if (num_shards_ > 0) {
|
|
metadata_charge_policy = shards_[0].metadata_charge_policy_;
|
|
}
|
|
return reinterpret_cast<const LRUHandle*>(handle)->GetCharge(
|
|
metadata_charge_policy);
|
|
}
|
|
|
|
Cache::DeleterFn LRUCache::GetDeleter(Handle* handle) const {
|
|
auto h = reinterpret_cast<const LRUHandle*>(handle);
|
|
if (h->IsSecondaryCacheCompatible()) {
|
|
return h->info_.helper->del_cb;
|
|
} else {
|
|
return h->info_.deleter;
|
|
}
|
|
}
|
|
|
|
uint32_t LRUCache::GetHash(Handle* handle) const {
|
|
return reinterpret_cast<const LRUHandle*>(handle)->hash;
|
|
}
|
|
|
|
void LRUCache::DisownData() {
|
|
// Leak data only if that won't generate an ASAN/valgrind warning.
|
|
if (!kMustFreeHeapAllocations) {
|
|
shards_ = nullptr;
|
|
num_shards_ = 0;
|
|
}
|
|
}
|
|
|
|
size_t LRUCache::TEST_GetLRUSize() {
|
|
size_t lru_size_of_all_shards = 0;
|
|
for (int i = 0; i < num_shards_; i++) {
|
|
lru_size_of_all_shards += shards_[i].TEST_GetLRUSize();
|
|
}
|
|
return lru_size_of_all_shards;
|
|
}
|
|
|
|
double LRUCache::GetHighPriPoolRatio() {
|
|
double result = 0.0;
|
|
if (num_shards_ > 0) {
|
|
result = shards_[0].GetHighPriPoolRatio();
|
|
}
|
|
return result;
|
|
}
|
|
|
|
void LRUCache::WaitAll(std::vector<Handle*>& handles) {
|
|
if (secondary_cache_) {
|
|
std::vector<SecondaryCacheResultHandle*> sec_handles;
|
|
sec_handles.reserve(handles.size());
|
|
for (Handle* handle : handles) {
|
|
if (!handle) {
|
|
continue;
|
|
}
|
|
LRUHandle* lru_handle = reinterpret_cast<LRUHandle*>(handle);
|
|
if (!lru_handle->IsPending()) {
|
|
continue;
|
|
}
|
|
sec_handles.emplace_back(lru_handle->sec_handle);
|
|
}
|
|
secondary_cache_->WaitAll(sec_handles);
|
|
for (Handle* handle : handles) {
|
|
if (!handle) {
|
|
continue;
|
|
}
|
|
LRUHandle* lru_handle = reinterpret_cast<LRUHandle*>(handle);
|
|
if (!lru_handle->IsPending()) {
|
|
continue;
|
|
}
|
|
uint32_t hash = GetHash(handle);
|
|
LRUCacheShard* shard = static_cast<LRUCacheShard*>(GetShard(Shard(hash)));
|
|
shard->Promote(lru_handle);
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace lru_cache
|
|
|
|
std::shared_ptr<Cache> NewLRUCache(
|
|
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
|
|
double high_pri_pool_ratio,
|
|
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
|
|
CacheMetadataChargePolicy metadata_charge_policy,
|
|
const std::shared_ptr<SecondaryCache>& secondary_cache) {
|
|
if (num_shard_bits >= 20) {
|
|
return nullptr; // The cache cannot be sharded into too many fine pieces.
|
|
}
|
|
if (high_pri_pool_ratio < 0.0 || high_pri_pool_ratio > 1.0) {
|
|
// Invalid high_pri_pool_ratio
|
|
return nullptr;
|
|
}
|
|
if (num_shard_bits < 0) {
|
|
num_shard_bits = GetDefaultCacheShardBits(capacity);
|
|
}
|
|
return std::make_shared<LRUCache>(
|
|
capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio,
|
|
std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy,
|
|
secondary_cache);
|
|
}
|
|
|
|
std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts) {
|
|
return NewLRUCache(
|
|
cache_opts.capacity, cache_opts.num_shard_bits,
|
|
cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio,
|
|
cache_opts.memory_allocator, cache_opts.use_adaptive_mutex,
|
|
cache_opts.metadata_charge_policy, cache_opts.secondary_cache);
|
|
}
|
|
|
|
std::shared_ptr<Cache> NewLRUCache(
|
|
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
|
|
double high_pri_pool_ratio,
|
|
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
|
|
CacheMetadataChargePolicy metadata_charge_policy) {
|
|
return NewLRUCache(capacity, num_shard_bits, strict_capacity_limit,
|
|
high_pri_pool_ratio, memory_allocator, use_adaptive_mutex,
|
|
metadata_charge_policy, nullptr);
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|