mirror of https://github.com/facebook/rocksdb.git
Persistent Read Cache (5) Volatile cache tier implementation
Summary: This provides provides an implementation of PersistentCacheTier that is specialized for RAM. This tier does not persist data though. Why do we need this tier ? This is ideal as tier 0. This tier can host data that is too hot. Why can't we use Cache variants ? Yes you can use them instead. This tier can potentially outperform BlockCache in RAW mode by virtue of compression and compressed cache in block cache doesn't seem very popular. Potentially this tier can be modified to under stand the disadvantage of the tier below and retain data that the tier below is bad at handling (for example index and bloom data that is huge in size) Test Plan: Run unit tests added Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D57069
This commit is contained in:
parent
fda098461b
commit
d755c62f92
6
Makefile
6
Makefile
|
@ -361,7 +361,8 @@ TESTS = \
|
|||
compaction_job_stats_test \
|
||||
transaction_test \
|
||||
ldb_cmd_test \
|
||||
iostats_context_test
|
||||
iostats_context_test \
|
||||
persistent_cache_test \
|
||||
|
||||
PARALLEL_TEST = \
|
||||
backupable_db_test \
|
||||
|
@ -1183,6 +1184,9 @@ ldb: tools/ldb.o $(LIBOBJECTS)
|
|||
iostats_context_test: util/iostats_context_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_V_CCLD)$(CXX) $^ $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
|
||||
|
||||
persistent_cache_test: utilities/persistent_cache/persistent_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(AM_LINK)
|
||||
|
||||
#-------------------------------------------------
|
||||
# make install related stuff
|
||||
INSTALL_PATH ?= /usr/local
|
||||
|
|
1
src.mk
1
src.mk
|
@ -132,6 +132,7 @@ LIB_SOURCES = \
|
|||
utilities/merge_operators/uint64add.cc \
|
||||
utilities/options/options_util.cc \
|
||||
utilities/persistent_cache/persistent_cache_tier.cc \
|
||||
utilities/persistent_cache/volatile_tier_impl.cc \
|
||||
utilities/redis/redis_lists.cc \
|
||||
utilities/simulator_cache/sim_cache.cc \
|
||||
utilities/spatialdb/spatial_db.cc \
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "util/random.h"
|
||||
#include "utilities/persistent_cache/hash_table.h"
|
||||
#include "utilities/persistent_cache/lrulist.h"
|
||||
|
||||
|
@ -63,7 +64,7 @@ class EvictableHashTable : private HashTable<T*, Hash, Equal> {
|
|||
port::RWMutex& lock = GetMutex(h);
|
||||
|
||||
ReadLock _(&lock);
|
||||
if (hash_table::Find(bucket, t, ret)) {
|
||||
if (hash_table::Find(&bucket, t, ret)) {
|
||||
++(*ret)->refs_;
|
||||
lru.Touch(*ret);
|
||||
return true;
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "utilities/persistent_cache/persistent_cache_test.h"
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <thread>
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// Volatile cache tests
|
||||
TEST_F(PersistentCacheTierTest, VolatileCacheInsert) {
|
||||
for (auto nthreads : {1, 5}) {
|
||||
for (auto max_keys : {10 * 1024, 1 * 1024 * 1024}) {
|
||||
cache_ = std::make_shared<VolatileCacheTier>();
|
||||
RunInsertTest(nthreads, max_keys);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(PersistentCacheTierTest, VolatileCacheInsertWithEviction) {
|
||||
for (auto nthreads : {1, 5}) {
|
||||
for (auto max_keys : {1 * 1024 * 1024}) {
|
||||
cache_ = std::make_shared<VolatileCacheTier>(/*compressed=*/true,
|
||||
/*size=*/1 * 1024 * 1024);
|
||||
RunInsertTestWithEviction(nthreads, max_keys);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// test table with volatile page cache
|
||||
TEST_F(PersistentCacheDBTest, VolatileCacheTest) {
|
||||
RunTest(std::bind(&PersistentCacheDBTest::MakeVolatileCache, this));
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
#else
|
||||
int main() { return 0; }
|
||||
#endif
|
|
@ -0,0 +1,416 @@
|
|||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#pragma once
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <list>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "rocksdb/cache.h"
|
||||
#include "table/block_builder.h"
|
||||
#include "util/arena.h"
|
||||
#include "util/testharness.h"
|
||||
#include "utilities/persistent_cache/volatile_tier_impl.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
//
|
||||
// Unit tests for testing PersistentCacheTier
|
||||
//
|
||||
class PersistentCacheTierTest : public testing::Test {
|
||||
public:
|
||||
explicit PersistentCacheTierTest()
|
||||
: path_(test::TmpDir(Env::Default()) + "/cache_test") {}
|
||||
|
||||
virtual ~PersistentCacheTierTest() {
|
||||
if (cache_) {
|
||||
Status s = cache_->Close();
|
||||
assert(s.ok());
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
// Flush cache
|
||||
void Flush() {
|
||||
if (cache_) {
|
||||
cache_->Flush();
|
||||
}
|
||||
}
|
||||
|
||||
// create threaded workload
|
||||
template <class T>
|
||||
std::list<std::thread> SpawnThreads(const size_t n, const T& fn) {
|
||||
std::list<std::thread> threads;
|
||||
for (size_t i = 0; i < n; i++) {
|
||||
std::thread th(fn);
|
||||
threads.push_back(std::move(th));
|
||||
}
|
||||
return std::move(threads);
|
||||
}
|
||||
|
||||
// Wait for threads to join
|
||||
void Join(std::list<std::thread>&& threads) {
|
||||
for (auto& th : threads) {
|
||||
th.join();
|
||||
}
|
||||
threads.clear();
|
||||
}
|
||||
|
||||
// Run insert workload in threads
|
||||
void Insert(const size_t nthreads, const size_t max_keys) {
|
||||
key_ = 0;
|
||||
max_keys_ = max_keys;
|
||||
// spawn threads
|
||||
auto fn = std::bind(&PersistentCacheTierTest::InsertImpl, this);
|
||||
auto threads = std::move(SpawnThreads(nthreads, fn));
|
||||
// join with threads
|
||||
Join(std::move(threads));
|
||||
// Flush cache
|
||||
Flush();
|
||||
}
|
||||
|
||||
// Run verification on the cache
|
||||
void Verify(const size_t nthreads = 1, const bool eviction_enabled = false) {
|
||||
stats_verify_hits_ = 0;
|
||||
stats_verify_missed_ = 0;
|
||||
key_ = 0;
|
||||
// spawn threads
|
||||
auto fn =
|
||||
std::bind(&PersistentCacheTierTest::VerifyImpl, this, eviction_enabled);
|
||||
auto threads = std::move(SpawnThreads(nthreads, fn));
|
||||
// join with threads
|
||||
Join(std::move(threads));
|
||||
}
|
||||
|
||||
// pad 0 to numbers
|
||||
std::string PaddedNumber(const size_t data, const size_t pad_size) {
|
||||
assert(pad_size);
|
||||
char* ret = new char[pad_size];
|
||||
int pos = static_cast<int>(pad_size) - 1;
|
||||
size_t count = 0;
|
||||
size_t t = data;
|
||||
// copy numbers
|
||||
while (t) {
|
||||
count++;
|
||||
ret[pos--] = '0' + t % 10;
|
||||
t = t / 10;
|
||||
}
|
||||
// copy 0s
|
||||
while (pos >= 0) {
|
||||
ret[pos--] = '0';
|
||||
}
|
||||
// post condition
|
||||
assert(count <= pad_size);
|
||||
assert(pos == -1);
|
||||
std::string result(ret, pad_size);
|
||||
delete[] ret;
|
||||
return result;
|
||||
}
|
||||
|
||||
// Insert workload implementation
|
||||
void InsertImpl() {
|
||||
const std::string prefix = "key_prefix_";
|
||||
|
||||
while (true) {
|
||||
size_t i = key_++;
|
||||
if (i >= max_keys_) {
|
||||
break;
|
||||
}
|
||||
|
||||
char data[4 * 1024];
|
||||
memset(data, '0' + (i % 10), sizeof(data));
|
||||
auto k = prefix + PaddedNumber(i, /*count=*/8);
|
||||
Slice key(k);
|
||||
while (!cache_->Insert(key, data, sizeof(data)).ok()) {
|
||||
/* sleep override */ sleep(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verification implementation
|
||||
void VerifyImpl(const bool eviction_enabled = false) {
|
||||
const std::string prefix = "key_prefix_";
|
||||
while (true) {
|
||||
size_t i = key_++;
|
||||
if (i >= max_keys_) {
|
||||
break;
|
||||
}
|
||||
|
||||
char edata[4 * 1024];
|
||||
memset(edata, '0' + (i % 10), sizeof(edata));
|
||||
auto k = prefix + PaddedNumber(i, /*count=*/8);
|
||||
Slice key(k);
|
||||
unique_ptr<char[]> block;
|
||||
size_t block_size;
|
||||
|
||||
if (eviction_enabled) {
|
||||
if (!cache_->Lookup(key, &block, &block_size).ok()) {
|
||||
// assume that the key is evicted
|
||||
stats_verify_missed_++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_OK(cache_->Lookup(key, &block, &block_size));
|
||||
ASSERT_EQ(block_size, sizeof(edata));
|
||||
ASSERT_EQ(memcmp(edata, block.get(), sizeof(edata)), 0);
|
||||
stats_verify_hits_++;
|
||||
}
|
||||
}
|
||||
|
||||
// template for insert test
|
||||
void RunInsertTest(const size_t nthreads, const size_t max_keys) {
|
||||
Insert(nthreads, max_keys);
|
||||
Verify(nthreads);
|
||||
ASSERT_EQ(stats_verify_hits_, max_keys);
|
||||
ASSERT_EQ(stats_verify_missed_, 0);
|
||||
|
||||
cache_->Close();
|
||||
cache_.reset();
|
||||
}
|
||||
|
||||
// template for insert with eviction test
|
||||
void RunInsertTestWithEviction(const size_t nthreads, const size_t max_keys) {
|
||||
Insert(nthreads, max_keys);
|
||||
Verify(nthreads, /*eviction_enabled=*/true);
|
||||
ASSERT_EQ(stats_verify_hits_ + stats_verify_missed_, max_keys);
|
||||
ASSERT_GT(stats_verify_hits_, 0);
|
||||
ASSERT_GT(stats_verify_missed_, 0);
|
||||
|
||||
cache_->Close();
|
||||
cache_.reset();
|
||||
}
|
||||
|
||||
const std::string path_;
|
||||
shared_ptr<Logger> log_;
|
||||
std::shared_ptr<PersistentCacheTier> cache_;
|
||||
std::atomic<size_t> key_{0};
|
||||
size_t max_keys_ = 0;
|
||||
std::atomic<size_t> stats_verify_hits_{0};
|
||||
std::atomic<size_t> stats_verify_missed_{0};
|
||||
};
|
||||
|
||||
//
|
||||
// RocksDB tests
|
||||
//
|
||||
class PersistentCacheDBTest : public DBTestBase {
|
||||
public:
|
||||
PersistentCacheDBTest() : DBTestBase("/cache_test") {
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"GetUniqueIdFromFile:FS_IOC_GETVERSION",
|
||||
PersistentCacheDBTest::UniqueIdCallback);
|
||||
}
|
||||
|
||||
static void UniqueIdCallback(void* arg) {
|
||||
int* result = reinterpret_cast<int*>(arg);
|
||||
if (*result == -1) {
|
||||
*result = 0;
|
||||
}
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->ClearTrace();
|
||||
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
||||
"GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback);
|
||||
}
|
||||
|
||||
std::shared_ptr<PersistentCacheTier> MakeVolatileCache() {
|
||||
return std::make_shared<VolatileCacheTier>();
|
||||
}
|
||||
|
||||
static uint32_t TestGetTickerCount(const Options& options,
|
||||
Tickers ticker_type) {
|
||||
return options.statistics->getTickerCount(ticker_type);
|
||||
}
|
||||
|
||||
// insert data to table
|
||||
void Insert(const Options& options,
|
||||
const BlockBasedTableOptions& table_options, const int num_iter,
|
||||
std::vector<std::string>* values) {
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
// default column family doesn't have block cache
|
||||
Options no_block_cache_opts;
|
||||
no_block_cache_opts.statistics = options.statistics;
|
||||
no_block_cache_opts = CurrentOptions(no_block_cache_opts);
|
||||
BlockBasedTableOptions table_options_no_bc;
|
||||
table_options_no_bc.no_block_cache = true;
|
||||
no_block_cache_opts.table_factory.reset(
|
||||
NewBlockBasedTableFactory(table_options_no_bc));
|
||||
ReopenWithColumnFamilies(
|
||||
{"default", "pikachu"},
|
||||
std::vector<Options>({no_block_cache_opts, options}));
|
||||
|
||||
Random rnd(301);
|
||||
|
||||
// Write 8MB (80 values, each 100K)
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
|
||||
std::string str;
|
||||
for (int i = 0; i < num_iter; i++) {
|
||||
if (i % 4 == 0) { // high compression ratio
|
||||
str = RandomString(&rnd, 1000);
|
||||
}
|
||||
values->push_back(str);
|
||||
ASSERT_OK(Put(1, Key(i), (*values)[i]));
|
||||
}
|
||||
|
||||
// flush all data from memtable so that reads are from block cache
|
||||
ASSERT_OK(Flush(1));
|
||||
}
|
||||
|
||||
// verify data
|
||||
void Verify(const int num_iter, const std::vector<std::string>& values) {
|
||||
for (int j = 0; j < 2; ++j) {
|
||||
for (int i = 0; i < num_iter; i++) {
|
||||
ASSERT_EQ(Get(1, Key(i)), values[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// test template
|
||||
void RunTest(const std::function<std::shared_ptr<PersistentCacheTier>(bool)>&
|
||||
new_pcache) {
|
||||
if (!Snappy_Supported()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// number of insertion interations
|
||||
int num_iter = 100 * 1024;
|
||||
|
||||
for (int iter = 0; iter < 5; iter++) {
|
||||
Options options;
|
||||
options.write_buffer_size = 64 * 1024; // small write buffer
|
||||
options.statistics = rocksdb::CreateDBStatistics();
|
||||
options = CurrentOptions(options);
|
||||
|
||||
// setup page cache
|
||||
std::shared_ptr<PersistentCacheTier> pcache;
|
||||
BlockBasedTableOptions table_options;
|
||||
table_options.cache_index_and_filter_blocks = true;
|
||||
|
||||
const uint64_t uint64_max = std::numeric_limits<uint64_t>::max();
|
||||
|
||||
switch (iter) {
|
||||
case 0:
|
||||
// page cache, block cache, no-compressed cache
|
||||
pcache = new_pcache(/*is_compressed=*/true);
|
||||
table_options.persistent_cache = pcache;
|
||||
table_options.block_cache = NewLRUCache(uint64_max);
|
||||
table_options.block_cache_compressed = nullptr;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
break;
|
||||
case 1:
|
||||
// page cache, block cache, compressed cache
|
||||
pcache = new_pcache(/*is_compressed=*/true);
|
||||
table_options.persistent_cache = pcache;
|
||||
table_options.block_cache = NewLRUCache(uint64_max);
|
||||
table_options.block_cache_compressed = NewLRUCache(uint64_max);
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
break;
|
||||
case 2:
|
||||
// page cache, block cache, compressed cache + KNoCompression
|
||||
// both block cache and compressed cache, but DB is not compressed
|
||||
// also, make block cache sizes bigger, to trigger block cache hits
|
||||
pcache = new_pcache(/*is_compressed=*/true);
|
||||
table_options.persistent_cache = pcache;
|
||||
table_options.block_cache = NewLRUCache(uint64_max);
|
||||
table_options.block_cache_compressed = NewLRUCache(uint64_max);
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
options.compression = kNoCompression;
|
||||
break;
|
||||
case 3:
|
||||
// page cache, no block cache, no compressed cache
|
||||
pcache = new_pcache(/*is_compressed=*/false);
|
||||
table_options.persistent_cache = pcache;
|
||||
table_options.block_cache = nullptr;
|
||||
table_options.block_cache_compressed = nullptr;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
break;
|
||||
case 4:
|
||||
// page cache, no block cache, no compressed cache
|
||||
// Page cache caches compressed blocks
|
||||
pcache = new_pcache(/*is_compressed=*/true);
|
||||
table_options.persistent_cache = pcache;
|
||||
table_options.block_cache = nullptr;
|
||||
table_options.block_cache_compressed = nullptr;
|
||||
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||
break;
|
||||
default:
|
||||
ASSERT_TRUE(false);
|
||||
}
|
||||
|
||||
std::vector<std::string> values;
|
||||
// insert data
|
||||
Insert(options, table_options, num_iter, &values);
|
||||
// flush all data in cache to device
|
||||
pcache->Flush();
|
||||
// verify data
|
||||
Verify(num_iter, values);
|
||||
|
||||
auto block_miss = TestGetTickerCount(options, BLOCK_CACHE_MISS);
|
||||
auto compressed_block_hit =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT);
|
||||
auto compressed_block_miss =
|
||||
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS);
|
||||
auto page_hit = TestGetTickerCount(options, PERSISTENT_CACHE_HIT);
|
||||
auto page_miss = TestGetTickerCount(options, PERSISTENT_CACHE_MISS);
|
||||
|
||||
// check that we triggered the appropriate code paths in the cache
|
||||
switch (iter) {
|
||||
case 0:
|
||||
// page cache, block cache, no-compressed cache
|
||||
ASSERT_GT(page_miss, 0);
|
||||
ASSERT_GT(page_hit, 0);
|
||||
ASSERT_GT(block_miss, 0);
|
||||
ASSERT_EQ(compressed_block_miss, 0);
|
||||
ASSERT_EQ(compressed_block_hit, 0);
|
||||
break;
|
||||
case 1:
|
||||
// page cache, block cache, compressed cache
|
||||
ASSERT_GT(page_miss, 0);
|
||||
ASSERT_GT(block_miss, 0);
|
||||
ASSERT_GT(compressed_block_miss, 0);
|
||||
break;
|
||||
case 2:
|
||||
// page cache, block cache, compressed cache + KNoCompression
|
||||
ASSERT_GT(page_miss, 0);
|
||||
ASSERT_GT(page_hit, 0);
|
||||
ASSERT_GT(block_miss, 0);
|
||||
ASSERT_GT(compressed_block_miss, 0);
|
||||
// remember kNoCompression
|
||||
ASSERT_EQ(compressed_block_hit, 0);
|
||||
break;
|
||||
case 3:
|
||||
case 4:
|
||||
// page cache, no block cache, no compressed cache
|
||||
ASSERT_GT(page_miss, 0);
|
||||
ASSERT_GT(page_hit, 0);
|
||||
ASSERT_EQ(compressed_block_hit, 0);
|
||||
ASSERT_EQ(compressed_block_miss, 0);
|
||||
break;
|
||||
default:
|
||||
ASSERT_TRUE(false);
|
||||
}
|
||||
|
||||
options.create_if_missing = true;
|
||||
DestroyAndReopen(options);
|
||||
|
||||
pcache->Close();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif
|
|
@ -217,11 +217,7 @@ struct PersistentCacheOptions {
|
|||
class PersistentCacheTier : public PersistentCache {
|
||||
public:
|
||||
typedef std::shared_ptr<PersistentCacheTier> Tier;
|
||||
|
||||
struct TierStats {
|
||||
std::vector<HistogramImpl> histograms_;
|
||||
std::map<std::string, size_t> counters_;
|
||||
};
|
||||
typedef std::map<std::string, double> TierStats;
|
||||
|
||||
virtual ~PersistentCacheTier() {}
|
||||
|
||||
|
|
|
@ -0,0 +1,152 @@
|
|||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include "utilities/persistent_cache/volatile_tier_impl.h"
|
||||
|
||||
#include <string>
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
void VolatileCacheTier::DeleteCacheData(VolatileCacheTier::CacheData* data) {
|
||||
assert(data);
|
||||
delete data;
|
||||
}
|
||||
|
||||
VolatileCacheTier::~VolatileCacheTier() { index_.Clear(&DeleteCacheData); }
|
||||
|
||||
std::vector<PersistentCacheTier::TierStats> VolatileCacheTier::Stats() {
|
||||
PersistentCacheTier::TierStats stat;
|
||||
stat.insert({"persistent_cache.volatile_cache.hits", stats_.cache_hits_});
|
||||
stat.insert({"persistent_cache.volatile_cache.misses", stats_.cache_misses_});
|
||||
stat.insert(
|
||||
{"persistent_cache.volatile_cache.inserts", stats_.cache_inserts_});
|
||||
stat.insert({"persistent_cache.volatile_cache.evicts", stats_.cache_evicts_});
|
||||
stat.insert(
|
||||
{"persistent_cache.volatile_cache.hit_pct", stats_.CacheHitPct()});
|
||||
stat.insert(
|
||||
{"persistent_cache.volatile_cache.miss_pct", stats_.CacheMissPct()});
|
||||
|
||||
std::vector<PersistentCacheTier::TierStats> tier_stats;
|
||||
if (next_tier()) {
|
||||
tier_stats = std::move(next_tier()->Stats());
|
||||
}
|
||||
tier_stats.push_back(stat);
|
||||
return tier_stats;
|
||||
}
|
||||
|
||||
std::string VolatileCacheTier::PrintStats() {
|
||||
std::ostringstream ss;
|
||||
ss << "pagecache.volatilecache.hits: " << stats_.cache_hits_ << std::endl
|
||||
<< "pagecache.volatilecache.misses: " << stats_.cache_misses_ << std::endl
|
||||
<< "pagecache.volatilecache.inserts: " << stats_.cache_inserts_
|
||||
<< std::endl
|
||||
<< "pagecache.volatilecache.evicts: " << stats_.cache_evicts_ << std::endl
|
||||
<< "pagecache.volatilecache.hit_pct: " << stats_.CacheHitPct() << std::endl
|
||||
<< "pagecache.volatilecache.miss_pct: " << stats_.CacheMissPct()
|
||||
<< std::endl
|
||||
<< PersistentCacheTier::PrintStats();
|
||||
return std::move(ss.str());
|
||||
}
|
||||
|
||||
Status VolatileCacheTier::Insert(const Slice& page_key, const char* data,
|
||||
const size_t size) {
|
||||
// precondition
|
||||
assert(data);
|
||||
assert(size);
|
||||
|
||||
// increment the size
|
||||
size_ += size;
|
||||
|
||||
// check if we have overshot the limit, if so evict some space
|
||||
while (size_ > max_size_) {
|
||||
if (!Evict()) {
|
||||
// unable to evict data, we give up so we don't spike read
|
||||
// latency
|
||||
assert(size_ >= size);
|
||||
size_ -= size;
|
||||
return Status::TryAgain("Unable to evict any data");
|
||||
}
|
||||
}
|
||||
|
||||
assert(size_ >= size);
|
||||
|
||||
// insert order: LRU, followed by index
|
||||
std::string key(page_key.data(), page_key.size());
|
||||
std::string value(data, size);
|
||||
std::unique_ptr<CacheData> cache_data(
|
||||
new CacheData(std::move(key), std::move(value)));
|
||||
bool ok = index_.Insert(cache_data.get());
|
||||
if (!ok) {
|
||||
// decrement the size that we incremented ahead of time
|
||||
assert(size_ >= size);
|
||||
size_ -= size;
|
||||
// failed to insert to cache, block already in cache
|
||||
return Status::TryAgain("key already exists in volatile cache");
|
||||
}
|
||||
|
||||
cache_data.release();
|
||||
stats_.cache_inserts_++;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VolatileCacheTier::Lookup(const Slice& page_key,
|
||||
std::unique_ptr<char[]>* result,
|
||||
size_t* size) {
|
||||
CacheData key(std::move(page_key.ToString()));
|
||||
CacheData* kv;
|
||||
bool ok = index_.Find(&key, &kv);
|
||||
if (ok) {
|
||||
// set return data
|
||||
result->reset(new char[kv->value.size()]);
|
||||
memcpy(result->get(), kv->value.c_str(), kv->value.size());
|
||||
*size = kv->value.size();
|
||||
// drop the reference on cache data
|
||||
kv->refs_--;
|
||||
// update stats
|
||||
stats_.cache_hits_++;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
stats_.cache_misses_++;
|
||||
|
||||
if (next_tier()) {
|
||||
return next_tier()->Lookup(page_key, result, size);
|
||||
}
|
||||
|
||||
return Status::NotFound("key not found in volatile cache");
|
||||
}
|
||||
|
||||
bool VolatileCacheTier::Erase(const Slice& key) {
|
||||
assert(!"not supported");
|
||||
return true;
|
||||
}
|
||||
|
||||
bool VolatileCacheTier::Evict() {
|
||||
CacheData* edata = index_.Evict();
|
||||
if (!edata) {
|
||||
// not able to evict any object
|
||||
return false;
|
||||
}
|
||||
|
||||
stats_.cache_evicts_++;
|
||||
|
||||
// push the evicted object to the next level
|
||||
if (next_tier()) {
|
||||
next_tier()->Insert(Slice(edata->key), edata->value.c_str(),
|
||||
edata->value.size());
|
||||
}
|
||||
|
||||
// adjust size and destroy data
|
||||
size_ -= edata->value.size();
|
||||
delete edata;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif
|
|
@ -0,0 +1,140 @@
|
|||
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
#pragma once
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
#include <atomic>
|
||||
#include <limits>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "rocksdb/cache.h"
|
||||
#include "utilities/persistent_cache/hash_table.h"
|
||||
#include "utilities/persistent_cache/hash_table_evictable.h"
|
||||
#include "utilities/persistent_cache/persistent_cache_tier.h"
|
||||
|
||||
// VolatileCacheTier
|
||||
//
|
||||
// This file provides persistent cache tier implementation for caching
|
||||
// key/values in RAM.
|
||||
//
|
||||
// key/values
|
||||
// |
|
||||
// V
|
||||
// +-------------------+
|
||||
// | VolatileCacheTier | Store in an evictable hash table
|
||||
// +-------------------+
|
||||
// |
|
||||
// V
|
||||
// on eviction
|
||||
// pushed to next tier
|
||||
//
|
||||
// The implementation is designed to be concurrent. The evictable hash table
|
||||
// implementation is not concurrent at this point though.
|
||||
//
|
||||
// The eviction algorithm is LRU
|
||||
namespace rocksdb {
|
||||
|
||||
class VolatileCacheTier : public PersistentCacheTier {
|
||||
public:
|
||||
explicit VolatileCacheTier(
|
||||
const bool is_compressed = true,
|
||||
const size_t max_size = std::numeric_limits<size_t>::max())
|
||||
: is_compressed_(is_compressed), max_size_(max_size) {}
|
||||
|
||||
virtual ~VolatileCacheTier();
|
||||
|
||||
// insert to cache
|
||||
Status Insert(const Slice& page_key, const char* data,
|
||||
const size_t size) override;
|
||||
// lookup key in cache
|
||||
Status Lookup(const Slice& page_key, std::unique_ptr<char[]>* data,
|
||||
size_t* size) override;
|
||||
|
||||
// is compressed cache ?
|
||||
bool IsCompressed() override { return is_compressed_; }
|
||||
|
||||
// erase key from cache
|
||||
bool Erase(const Slice& key) override;
|
||||
|
||||
// Expose stats as map
|
||||
std::vector<TierStats> Stats() override;
|
||||
|
||||
// Print stats to string
|
||||
std::string PrintStats() override;
|
||||
|
||||
private:
|
||||
//
|
||||
// Cache data abstraction
|
||||
//
|
||||
struct CacheData : LRUElement<CacheData> {
|
||||
explicit CacheData(CacheData&& rhs) noexcept
|
||||
: key(std::move(rhs.key)), value(std::move(rhs.value)) {}
|
||||
|
||||
explicit CacheData(const std::string& _key, const std::string& _value = "")
|
||||
: key(_key), value(_value) {}
|
||||
|
||||
virtual ~CacheData() {}
|
||||
|
||||
const std::string key;
|
||||
const std::string value;
|
||||
};
|
||||
|
||||
static void DeleteCacheData(CacheData* data);
|
||||
|
||||
//
|
||||
// Index and LRU definition
|
||||
//
|
||||
struct CacheDataHash {
|
||||
uint64_t operator()(const CacheData* obj) const {
|
||||
assert(obj);
|
||||
return std::hash<std::string>()(obj->key);
|
||||
}
|
||||
};
|
||||
|
||||
struct CacheDataEqual {
|
||||
bool operator()(const CacheData* lhs, const CacheData* rhs) const {
|
||||
assert(lhs);
|
||||
assert(rhs);
|
||||
return lhs->key == rhs->key;
|
||||
}
|
||||
};
|
||||
|
||||
struct Statistics {
|
||||
uint64_t cache_misses_ = 0;
|
||||
uint64_t cache_hits_ = 0;
|
||||
uint64_t cache_inserts_ = 0;
|
||||
uint64_t cache_evicts_ = 0;
|
||||
|
||||
double CacheHitPct() const {
|
||||
auto lookups = cache_hits_ + cache_misses_;
|
||||
return lookups ? 100 * cache_hits_ / static_cast<double>(lookups) : 0.0;
|
||||
}
|
||||
|
||||
double CacheMissPct() const {
|
||||
auto lookups = cache_hits_ + cache_misses_;
|
||||
return lookups ? 100 * cache_misses_ / static_cast<double>(lookups) : 0.0;
|
||||
}
|
||||
};
|
||||
|
||||
typedef EvictableHashTable<CacheData, CacheDataHash, CacheDataEqual>
|
||||
IndexType;
|
||||
|
||||
// Evict LRU tail
|
||||
bool Evict();
|
||||
|
||||
const bool is_compressed_ = true; // does it store compressed data
|
||||
IndexType index_; // in-memory cache
|
||||
std::atomic<uint64_t> max_size_{0}; // Maximum size of the cache
|
||||
std::atomic<uint64_t> size_{0}; // Size of the cache
|
||||
Statistics stats_;
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue