rocksdb/cache/clock_cache.cc

840 lines
30 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "cache/clock_cache.h"
#ifndef SUPPORT_CLOCK_CACHE
namespace ROCKSDB_NAMESPACE {
std::shared_ptr<Cache> NewClockCache(
size_t /*capacity*/, int /*num_shard_bits*/, bool /*strict_capacity_limit*/,
CacheMetadataChargePolicy /*metadata_charge_policy*/) {
// Clock cache not supported.
return nullptr;
}
} // namespace ROCKSDB_NAMESPACE
#else
#include <assert.h>
#include <atomic>
#include <deque>
// "tbb/concurrent_hash_map.h" requires RTTI if exception is enabled.
// Disable it so users can chooose to disable RTTI.
#ifndef ROCKSDB_USE_RTTI
#define TBB_USE_EXCEPTIONS 0
#endif
#include "cache/sharded_cache.h"
#include "port/lang.h"
#include "port/malloc.h"
#include "port/port.h"
#include "tbb/concurrent_hash_map.h"
#include "util/autovector.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {
namespace {
// An implementation of the Cache interface based on CLOCK algorithm, with
// better concurrent performance than LRUCache. The idea of CLOCK algorithm
// is to maintain all cache entries in a circular list, and an iterator
// (the "head") pointing to the last examined entry. Eviction starts from the
// current head. Each entry is given a second chance before eviction, if it
// has been access since last examine. In contrast to LRU, no modification
// to the internal data-structure (except for flipping the usage bit) needs
// to be done upon lookup. This gives us oppertunity to implement a cache
// with better concurrency.
//
// Each cache entry is represented by a cache handle, and all the handles
// are arranged in a circular list, as describe above. Upon erase of an entry,
// we never remove the handle. Instead, the handle is put into a recycle bin
// to be re-use. This is to avoid memory dealocation, which is hard to deal
// with in concurrent environment.
//
// The cache also maintains a concurrent hash map for lookup. Any concurrent
// hash map implementation should do the work. We currently use
// tbb::concurrent_hash_map because it supports concurrent erase.
//
// Each cache handle has the following flags and counters, which are squeeze
// in an atomic interger, to make sure the handle always be in a consistent
// state:
//
// * In-cache bit: whether the entry is reference by the cache itself. If
// an entry is in cache, its key would also be available in the hash map.
// * Usage bit: whether the entry has been access by user since last
// examine for eviction. Can be reset by eviction.
// * Reference count: reference count by user.
//
// An entry can be reference only when it's in cache. An entry can be evicted
// only when it is in cache, has no usage since last examine, and reference
// count is zero.
//
// The follow figure shows a possible layout of the cache. Boxes represents
// cache handles and numbers in each box being in-cache bit, usage bit and
// reference count respectively.
//
// hash map:
// +-------+--------+
// | key | handle |
// +-------+--------+
// | "foo" | 5 |-------------------------------------+
// +-------+--------+ |
// | "bar" | 2 |--+ |
// +-------+--------+ | |
// | |
// head | |
// | | |
// circular list: | | |
// +-------+ +-------+ +-------+ +-------+ +-------+ +-------
// |(0,0,0)|---|(1,1,0)|---|(0,0,0)|---|(0,1,3)|---|(1,0,0)|---| ...
// +-------+ +-------+ +-------+ +-------+ +-------+ +-------
// | |
// +-------+ +-----------+
// | |
// +---+---+
// recycle bin: | 1 | 3 |
// +---+---+
//
// Suppose we try to insert "baz" into the cache at this point and the cache is
// full. The cache will first look for entries to evict, starting from where
// head points to (the second entry). It resets usage bit of the second entry,
// skips the third and fourth entry since they are not in cache, and finally
// evict the fifth entry ("foo"). It looks at recycle bin for available handle,
// grabs handle 3, and insert the key into the handle. The following figure
// shows the resulting layout.
//
// hash map:
// +-------+--------+
// | key | handle |
// +-------+--------+
// | "baz" | 3 |-------------+
// +-------+--------+ |
// | "bar" | 2 |--+ |
// +-------+--------+ | |
// | |
// | | head
// | | |
// circular list: | | |
// +-------+ +-------+ +-------+ +-------+ +-------+ +-------
// |(0,0,0)|---|(1,0,0)|---|(1,0,0)|---|(0,1,3)|---|(0,0,0)|---| ...
// +-------+ +-------+ +-------+ +-------+ +-------+ +-------
// | |
// +-------+ +-----------------------------------+
// | |
// +---+---+
// recycle bin: | 1 | 5 |
// +---+---+
//
// A global mutex guards the circular list, the head, and the recycle bin.
// We additionally require that modifying the hash map needs to hold the mutex.
// As such, Modifying the cache (such as Insert() and Erase()) require to
// hold the mutex. Lookup() only access the hash map and the flags associated
// with each handle, and don't require explicit locking. Release() has to
// acquire the mutex only when it releases the last reference to the entry and
// the entry has been erased from cache explicitly. A future improvement could
// be to remove the mutex completely.
//
// Benchmark:
// We run readrandom db_bench on a test DB of size 13GB, with size of each
// level:
//
// Level Files Size(MB)
// -------------------------
// L0 1 0.01
// L1 18 17.32
// L2 230 182.94
// L3 1186 1833.63
// L4 4602 8140.30
//
// We test with both 32 and 16 read threads, with 2GB cache size (the whole DB
// doesn't fits in) and 64GB cache size (the whole DB can fit in cache), and
// whether to put index and filter blocks in block cache. The benchmark runs
// with
// with RocksDB 4.10. We got the following result:
//
// Threads Cache Cache ClockCache LRUCache
// Size Index/Filter Throughput(MB/s) Hit Throughput(MB/s) Hit
// 32 2GB yes 466.7 85.9% 433.7 86.5%
// 32 2GB no 529.9 72.7% 532.7 73.9%
// 32 64GB yes 649.9 99.9% 507.9 99.9%
// 32 64GB no 740.4 99.9% 662.8 99.9%
// 16 2GB yes 278.4 85.9% 283.4 86.5%
// 16 2GB no 318.6 72.7% 335.8 73.9%
// 16 64GB yes 391.9 99.9% 353.3 99.9%
// 16 64GB no 433.8 99.8% 419.4 99.8%
// Cache entry meta data.
struct CacheHandle {
Slice key;
void* value;
size_t charge;
Cache::DeleterFn deleter;
uint32_t hash;
// Addition to "charge" to get "total charge" under metadata policy.
uint32_t meta_charge;
// Flags and counters associated with the cache handle:
// lowest bit: in-cache bit
// second lowest bit: usage bit
// the rest bits: reference count
// The handle is unused when flags equals to 0. The thread decreases the count
// to 0 is responsible to put the handle back to recycle_ and cleanup memory.
std::atomic<uint32_t> flags;
CacheHandle() = default;
CacheHandle(const CacheHandle& a) { *this = a; }
CacheHandle(const Slice& k, void* v,
void (*del)(const Slice& key, void* value))
: key(k), value(v), deleter(del) {}
CacheHandle& operator=(const CacheHandle& a) {
// Only copy members needed for deletion.
key = a.key;
value = a.value;
deleter = a.deleter;
return *this;
}
inline static uint32_t CalcMetadataCharge(
Slice key, CacheMetadataChargePolicy metadata_charge_policy) {
size_t meta_charge = 0;
if (metadata_charge_policy == kFullChargeCacheMetadata) {
meta_charge += sizeof(CacheHandle);
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
meta_charge +=
malloc_usable_size(static_cast<void*>(const_cast<char*>(key.data())));
#else
meta_charge += key.size();
#endif
}
assert(meta_charge <= UINT32_MAX);
return static_cast<uint32_t>(meta_charge);
}
inline size_t GetTotalCharge() { return charge + meta_charge; }
};
// Key of hash map. We store hash value with the key for convenience.
struct ClockCacheKey {
Slice key;
uint32_t hash_value;
ClockCacheKey() = default;
ClockCacheKey(const Slice& k, uint32_t h) {
key = k;
hash_value = h;
}
static bool equal(const ClockCacheKey& a, const ClockCacheKey& b) {
return a.hash_value == b.hash_value && a.key == b.key;
}
static size_t hash(const ClockCacheKey& a) {
return static_cast<size_t>(a.hash_value);
}
};
struct CleanupContext {
// List of values to be deleted, along with the key and deleter.
autovector<CacheHandle> to_delete_value;
// List of keys to be deleted.
autovector<const char*> to_delete_key;
};
// A cache shard which maintains its own CLOCK cache.
class ClockCacheShard final : public CacheShard {
public:
// Hash map type.
using HashTable =
tbb::concurrent_hash_map<ClockCacheKey, CacheHandle*, ClockCacheKey>;
ClockCacheShard();
~ClockCacheShard() override;
// Interfaces
void SetCapacity(size_t capacity) override;
void SetStrictCapacityLimit(bool strict_capacity_limit) override;
Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** handle, Cache::Priority priority) override;
Status Insert(const Slice& key, uint32_t hash, void* value,
const Cache::CacheItemHelper* helper, size_t charge,
Cache::Handle** handle, Cache::Priority priority) override {
return Insert(key, hash, value, charge, helper->del_cb, handle, priority);
}
Cache::Handle* Lookup(const Slice& key, uint32_t hash) override;
Cache::Handle* Lookup(const Slice& key, uint32_t hash,
const Cache::CacheItemHelper* /*helper*/,
const Cache::CreateCallback& /*create_cb*/,
Cache::Priority /*priority*/, bool /*wait*/,
Statistics* /*stats*/) override {
return Lookup(key, hash);
}
bool Release(Cache::Handle* handle, bool /*useful*/,
bool force_erase) override {
return Release(handle, force_erase);
}
bool IsReady(Cache::Handle* /*handle*/) override { return true; }
void Wait(Cache::Handle* /*handle*/) override {}
// If the entry in in cache, increase reference count and return true.
// Return false otherwise.
//
// Not necessary to hold mutex_ before being called.
bool Ref(Cache::Handle* handle) override;
bool Release(Cache::Handle* handle, bool force_erase = false) override;
void Erase(const Slice& key, uint32_t hash) override;
bool EraseAndConfirm(const Slice& key, uint32_t hash,
CleanupContext* context);
size_t GetUsage() const override;
size_t GetPinnedUsage() const override;
void EraseUnRefEntries() override;
void ApplyToSomeEntries(
const std::function<void(const Slice& key, void* value, size_t charge,
DeleterFn deleter)>& callback,
uint32_t average_entries_per_lock, uint32_t* state) override;
private:
static const uint32_t kInCacheBit = 1;
static const uint32_t kUsageBit = 2;
static const uint32_t kRefsOffset = 2;
static const uint32_t kOneRef = 1 << kRefsOffset;
// Helper functions to extract cache handle flags and counters.
static bool InCache(uint32_t flags) { return flags & kInCacheBit; }
static bool HasUsage(uint32_t flags) { return flags & kUsageBit; }
static uint32_t CountRefs(uint32_t flags) { return flags >> kRefsOffset; }
// Decrease reference count of the entry. If this decreases the count to 0,
// recycle the entry. If set_usage is true, also set the usage bit.
//
// returns true if a value is erased.
//
// Not necessary to hold mutex_ before being called.
bool Unref(CacheHandle* handle, bool set_usage, CleanupContext* context);
// Unset in-cache bit of the entry. Recycle the handle if necessary.
//
// returns true if a value is erased.
//
// Has to hold mutex_ before being called.
bool UnsetInCache(CacheHandle* handle, CleanupContext* context);
// Put the handle back to recycle_ list, and put the value associated with
// it into to-be-deleted list. It doesn't cleanup the key as it might be
// reused by another handle.
//
// Has to hold mutex_ before being called.
void RecycleHandle(CacheHandle* handle, CleanupContext* context);
// Delete keys and values in to-be-deleted list. Call the method without
// holding mutex, as destructors can be expensive.
void Cleanup(const CleanupContext& context);
// Examine the handle for eviction. If the handle is in cache, usage bit is
// not set, and referece count is 0, evict it from cache. Otherwise unset
// the usage bit.
//
// Has to hold mutex_ before being called.
bool TryEvict(CacheHandle* value, CleanupContext* context);
// Scan through the circular list, evict entries until we get enough capacity
// for new cache entry of specific size. Return true if success, false
// otherwise.
//
// Has to hold mutex_ before being called.
bool EvictFromCache(size_t charge, CleanupContext* context);
CacheHandle* Insert(const Slice& key, uint32_t hash, void* value,
size_t change,
void (*deleter)(const Slice& key, void* value),
bool hold_reference, CleanupContext* context,
bool* overwritten);
// Guards list_, head_, and recycle_. In addition, updating table_ also has
// to hold the mutex, to avoid the cache being in inconsistent state.
mutable port::Mutex mutex_;
// The circular list of cache handles. Initially the list is empty. Once a
// handle is needed by insertion, and no more handles are available in
// recycle bin, one more handle is appended to the end.
//
// We use std::deque for the circular list because we want to make sure
// pointers to handles are valid through out the life-cycle of the cache
// (in contrast to std::vector), and be able to grow the list (in contrast
// to statically allocated arrays).
std::deque<CacheHandle> list_;
// Pointer to the next handle in the circular list to be examine for
// eviction.
size_t head_;
// Recycle bin of cache handles.
autovector<CacheHandle*> recycle_;
// Maximum cache size.
std::atomic<size_t> capacity_;
// Current total size of the cache.
std::atomic<size_t> usage_;
// Total un-released cache size.
std::atomic<size_t> pinned_usage_;
// Whether allow insert into cache if cache is full.
std::atomic<bool> strict_capacity_limit_;
// Hash table (tbb::concurrent_hash_map) for lookup.
HashTable table_;
};
ClockCacheShard::ClockCacheShard()
: head_(0), usage_(0), pinned_usage_(0), strict_capacity_limit_(false) {}
ClockCacheShard::~ClockCacheShard() {
for (auto& handle : list_) {
uint32_t flags = handle.flags.load(std::memory_order_relaxed);
if (InCache(flags) || CountRefs(flags) > 0) {
if (handle.deleter != nullptr) {
(*handle.deleter)(handle.key, handle.value);
}
delete[] handle.key.data();
}
}
}
size_t ClockCacheShard::GetUsage() const {
return usage_.load(std::memory_order_relaxed);
}
size_t ClockCacheShard::GetPinnedUsage() const {
return pinned_usage_.load(std::memory_order_relaxed);
}
void ClockCacheShard::ApplyToSomeEntries(
const std::function<void(const Slice& key, void* value, size_t charge,
DeleterFn deleter)>& callback,
uint32_t average_entries_per_lock, uint32_t* state) {
assert(average_entries_per_lock > 0);
MutexLock lock(&mutex_);
// Figure out the range to iterate, update `state`
size_t list_size = list_.size();
size_t start_idx = *state;
size_t end_idx = start_idx + average_entries_per_lock;
if (start_idx > list_size) {
// Shouldn't reach here, but recoverable
assert(false);
// Mark finished with all
*state = UINT32_MAX;
return;
}
if (end_idx >= list_size || end_idx >= UINT32_MAX) {
// This also includes the hypothetical case of >4 billion
// cache handles.
end_idx = list_size;
// Mark finished with all
*state = UINT32_MAX;
} else {
*state = static_cast<uint32_t>(end_idx);
}
// Do the iteration
auto cur = list_.begin() + start_idx;
auto end = list_.begin() + end_idx;
for (; cur != end; ++cur) {
const CacheHandle& handle = *cur;
// Use relaxed semantics instead of acquire semantics since we are
// holding mutex
uint32_t flags = handle.flags.load(std::memory_order_relaxed);
if (InCache(flags)) {
callback(handle.key, handle.value, handle.charge, handle.deleter);
}
}
}
void ClockCacheShard::RecycleHandle(CacheHandle* handle,
CleanupContext* context) {
mutex_.AssertHeld();
assert(!InCache(handle->flags) && CountRefs(handle->flags) == 0);
context->to_delete_key.push_back(handle->key.data());
context->to_delete_value.emplace_back(*handle);
size_t total_charge = handle->GetTotalCharge();
// clearing `handle` fields would go here but not strictly required
recycle_.push_back(handle);
usage_.fetch_sub(total_charge, std::memory_order_relaxed);
}
void ClockCacheShard::Cleanup(const CleanupContext& context) {
for (const CacheHandle& handle : context.to_delete_value) {
if (handle.deleter) {
(*handle.deleter)(handle.key, handle.value);
}
}
for (const char* key : context.to_delete_key) {
delete[] key;
}
}
bool ClockCacheShard::Ref(Cache::Handle* h) {
auto handle = reinterpret_cast<CacheHandle*>(h);
// CAS loop to increase reference count.
uint32_t flags = handle->flags.load(std::memory_order_relaxed);
while (InCache(flags)) {
// Use acquire semantics on success, as further operations on the cache
// entry has to be order after reference count is increased.
if (handle->flags.compare_exchange_weak(flags, flags + kOneRef,
std::memory_order_acquire,
std::memory_order_relaxed)) {
if (CountRefs(flags) == 0) {
// No reference count before the operation.
size_t total_charge = handle->GetTotalCharge();
pinned_usage_.fetch_add(total_charge, std::memory_order_relaxed);
}
return true;
}
}
return false;
}
bool ClockCacheShard::Unref(CacheHandle* handle, bool set_usage,
CleanupContext* context) {
if (set_usage) {
handle->flags.fetch_or(kUsageBit, std::memory_order_relaxed);
}
// If the handle reaches state refs=0 and InCache=true after this
// atomic operation then we cannot access `handle` afterward, because
// it could be evicted before we access the `handle`.
size_t total_charge = handle->GetTotalCharge();
// Use acquire-release semantics as previous operations on the cache entry
// has to be order before reference count is decreased, and potential cleanup
// of the entry has to be order after.
uint32_t flags = handle->flags.fetch_sub(kOneRef, std::memory_order_acq_rel);
assert(CountRefs(flags) > 0);
if (CountRefs(flags) == 1) {
// this is the last reference.
pinned_usage_.fetch_sub(total_charge, std::memory_order_relaxed);
// Cleanup if it is the last reference.
if (!InCache(flags)) {
MutexLock l(&mutex_);
RecycleHandle(handle, context);
}
}
return context->to_delete_value.size();
}
bool ClockCacheShard::UnsetInCache(CacheHandle* handle,
CleanupContext* context) {
mutex_.AssertHeld();
// Use acquire-release semantics as previous operations on the cache entry
// has to be order before reference count is decreased, and potential cleanup
// of the entry has to be order after.
uint32_t flags =
handle->flags.fetch_and(~kInCacheBit, std::memory_order_acq_rel);
// Cleanup if it is the last reference.
if (InCache(flags) && CountRefs(flags) == 0) {
RecycleHandle(handle, context);
}
return context->to_delete_value.size();
}
bool ClockCacheShard::TryEvict(CacheHandle* handle, CleanupContext* context) {
mutex_.AssertHeld();
uint32_t flags = kInCacheBit;
if (handle->flags.compare_exchange_strong(flags, 0, std::memory_order_acquire,
std::memory_order_relaxed)) {
bool erased __attribute__((__unused__)) =
table_.erase(ClockCacheKey(handle->key, handle->hash));
assert(erased);
RecycleHandle(handle, context);
return true;
}
handle->flags.fetch_and(~kUsageBit, std::memory_order_relaxed);
return false;
}
bool ClockCacheShard::EvictFromCache(size_t charge, CleanupContext* context) {
size_t usage = usage_.load(std::memory_order_relaxed);
size_t capacity = capacity_.load(std::memory_order_relaxed);
if (usage == 0) {
return charge <= capacity;
}
size_t new_head = head_;
bool second_iteration = false;
while (usage + charge > capacity) {
assert(new_head < list_.size());
if (TryEvict(&list_[new_head], context)) {
usage = usage_.load(std::memory_order_relaxed);
}
new_head = (new_head + 1 >= list_.size()) ? 0 : new_head + 1;
if (new_head == head_) {
if (second_iteration) {
return false;
} else {
second_iteration = true;
}
}
}
head_ = new_head;
return true;
}
void ClockCacheShard::SetCapacity(size_t capacity) {
CleanupContext context;
{
MutexLock l(&mutex_);
capacity_.store(capacity, std::memory_order_relaxed);
EvictFromCache(0, &context);
}
Cleanup(context);
}
void ClockCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) {
strict_capacity_limit_.store(strict_capacity_limit,
std::memory_order_relaxed);
}
CacheHandle* ClockCacheShard::Insert(
const Slice& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), bool hold_reference,
CleanupContext* context, bool* overwritten) {
assert(overwritten != nullptr && *overwritten == false);
uint32_t meta_charge =
CacheHandle::CalcMetadataCharge(key, metadata_charge_policy_);
size_t total_charge = charge + meta_charge;
MutexLock l(&mutex_);
bool success = EvictFromCache(total_charge, context);
bool strict = strict_capacity_limit_.load(std::memory_order_relaxed);
if (!success && (strict || !hold_reference)) {
context->to_delete_key.push_back(key.data());
if (!hold_reference) {
context->to_delete_value.emplace_back(key, value, deleter);
}
return nullptr;
}
// Grab available handle from recycle bin. If recycle bin is empty, create
// and append new handle to end of circular list.
CacheHandle* handle = nullptr;
if (!recycle_.empty()) {
handle = recycle_.back();
recycle_.pop_back();
} else {
list_.emplace_back();
handle = &list_.back();
}
// Fill handle.
handle->key = key;
handle->hash = hash;
handle->value = value;
handle->charge = charge;
handle->meta_charge = meta_charge;
handle->deleter = deleter;
uint32_t flags = hold_reference ? kInCacheBit + kOneRef : kInCacheBit;
// TODO investigate+fix suspected race condition:
// [thread 1] Lookup starts, up to Ref()
// [thread 2] Erase/evict the entry just looked up
// [thread 1] Ref() the handle, even though it's in the recycle bin
// [thread 2] Insert with recycling that handle
// Here we obliterate the other thread's Ref
// Possible fix: never blindly overwrite the flags, but only make
// relative updates (fetch_add, etc).
handle->flags.store(flags, std::memory_order_relaxed);
HashTable::accessor accessor;
if (table_.find(accessor, ClockCacheKey(key, hash))) {
*overwritten = true;
CacheHandle* existing_handle = accessor->second;
table_.erase(accessor);
UnsetInCache(existing_handle, context);
}
table_.insert(HashTable::value_type(ClockCacheKey(key, hash), handle));
if (hold_reference) {
pinned_usage_.fetch_add(total_charge, std::memory_order_relaxed);
}
usage_.fetch_add(total_charge, std::memory_order_relaxed);
return handle;
}
Status ClockCacheShard::Insert(const Slice& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const Slice& key, void* value),
Cache::Handle** out_handle,
Cache::Priority /*priority*/) {
CleanupContext context;
HashTable::accessor accessor;
char* key_data = new char[key.size()];
memcpy(key_data, key.data(), key.size());
Slice key_copy(key_data, key.size());
bool overwritten = false;
CacheHandle* handle = Insert(key_copy, hash, value, charge, deleter,
out_handle != nullptr, &context, &overwritten);
Status s;
if (out_handle != nullptr) {
if (handle == nullptr) {
s = Status::Incomplete("Insert failed due to LRU cache being full.");
} else {
*out_handle = reinterpret_cast<Cache::Handle*>(handle);
}
}
if (overwritten) {
assert(s.ok());
s = Status::OkOverwritten();
}
Cleanup(context);
return s;
}
Cache::Handle* ClockCacheShard::Lookup(const Slice& key, uint32_t hash) {
HashTable::const_accessor accessor;
if (!table_.find(accessor, ClockCacheKey(key, hash))) {
return nullptr;
}
CacheHandle* handle = accessor->second;
accessor.release();
// Ref() could fail if another thread sneak in and evict/erase the cache
// entry before we are able to hold reference.
if (!Ref(reinterpret_cast<Cache::Handle*>(handle))) {
return nullptr;
}
// Double check the key since the handle may now representing another key
// if other threads sneak in, evict/erase the entry and re-used the handle
// for another cache entry.
if (hash != handle->hash || key != handle->key) {
CleanupContext context;
Unref(handle, false, &context);
// It is possible Unref() delete the entry, so we need to cleanup.
Cleanup(context);
return nullptr;
}
return reinterpret_cast<Cache::Handle*>(handle);
}
bool ClockCacheShard::Release(Cache::Handle* h, bool force_erase) {
CleanupContext context;
CacheHandle* handle = reinterpret_cast<CacheHandle*>(h);
bool erased = Unref(handle, true, &context);
if (force_erase && !erased) {
erased = EraseAndConfirm(handle->key, handle->hash, &context);
}
Cleanup(context);
return erased;
}
void ClockCacheShard::Erase(const Slice& key, uint32_t hash) {
CleanupContext context;
EraseAndConfirm(key, hash, &context);
Cleanup(context);
}
bool ClockCacheShard::EraseAndConfirm(const Slice& key, uint32_t hash,
CleanupContext* context) {
MutexLock l(&mutex_);
HashTable::accessor accessor;
bool erased = false;
if (table_.find(accessor, ClockCacheKey(key, hash))) {
CacheHandle* handle = accessor->second;
table_.erase(accessor);
erased = UnsetInCache(handle, context);
}
return erased;
}
void ClockCacheShard::EraseUnRefEntries() {
CleanupContext context;
{
MutexLock l(&mutex_);
table_.clear();
for (auto& handle : list_) {
UnsetInCache(&handle, &context);
}
}
Cleanup(context);
}
class ClockCache final : public ShardedCache {
public:
ClockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
CacheMetadataChargePolicy metadata_charge_policy)
: ShardedCache(capacity, num_shard_bits, strict_capacity_limit) {
int num_shards = 1 << num_shard_bits;
shards_ = new ClockCacheShard[num_shards];
for (int i = 0; i < num_shards; i++) {
shards_[i].set_metadata_charge_policy(metadata_charge_policy);
}
SetCapacity(capacity);
SetStrictCapacityLimit(strict_capacity_limit);
}
~ClockCache() override { delete[] shards_; }
const char* Name() const override { return "ClockCache"; }
CacheShard* GetShard(uint32_t shard) override {
return reinterpret_cast<CacheShard*>(&shards_[shard]);
}
const CacheShard* GetShard(uint32_t shard) const override {
return reinterpret_cast<CacheShard*>(&shards_[shard]);
}
void* Value(Handle* handle) override {
return reinterpret_cast<const CacheHandle*>(handle)->value;
}
size_t GetCharge(Handle* handle) const override {
return reinterpret_cast<const CacheHandle*>(handle)->charge;
}
uint32_t GetHash(Handle* handle) const override {
return reinterpret_cast<const CacheHandle*>(handle)->hash;
}
DeleterFn GetDeleter(Handle* handle) const override {
return reinterpret_cast<const CacheHandle*>(handle)->deleter;
}
void DisownData() override {
// Leak data only if that won't generate an ASAN/valgrind warning
if (!kMustFreeHeapAllocations) {
shards_ = nullptr;
}
}
void WaitAll(std::vector<Handle*>& /*handles*/) override {}
private:
ClockCacheShard* shards_;
};
} // end anonymous namespace
std::shared_ptr<Cache> NewClockCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
CacheMetadataChargePolicy metadata_charge_policy) {
if (num_shard_bits < 0) {
num_shard_bits = GetDefaultCacheShardBits(capacity);
}
return std::make_shared<ClockCache>(
capacity, num_shard_bits, strict_capacity_limit, metadata_charge_policy);
}
} // namespace ROCKSDB_NAMESPACE
#endif // SUPPORT_CLOCK_CACHE