Add TieredCache and compressed cache capacity change to db_stress (#11935)

Summary:
Add `TieredCache` to the cache types tested by db_stress. Also add compressed secondary cache capacity change, and `WriteBufferManager` integration with `TieredCache` for memory charging.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11935

Test Plan: Run whitebox/blackbox crash tests locally

Reviewed By: akankshamahajan15

Differential Revision: D50135365

Pulled By: anand1976

fbshipit-source-id: 7d73ed00c00a0953d86e49f35cce6bd550ba00f1
This commit is contained in:
anand76 2023-10-10 13:12:18 -07:00 committed by Facebook GitHub Bot
parent 98ab2d80fa
commit 5b11f5a3a2
7 changed files with 220 additions and 27 deletions

View File

@ -507,7 +507,6 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio(
return s;
}
assert(old_sec_capacity >= pri_cache_res_->GetTotalMemoryUsed());
size_t old_sec_reserved =
old_sec_capacity - pri_cache_res_->GetTotalMemoryUsed();
// Calculate the new secondary cache reservation
@ -527,7 +526,6 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio(
// cache utilization (increase in capacity - increase in share of cache
// reservation)
// 3. Increase secondary cache capacity
assert(sec_reserved > old_sec_reserved || sec_reserved == 0);
s = secondary_cache_->Deflate(sec_reserved - old_sec_reserved);
assert(s.ok());
s = pri_cache_res_->UpdateCacheReservation(
@ -544,7 +542,6 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio(
// reservations)
// 3. Inflate the secondary cache to give it back the reduction in its
// share of cache reservations
assert(old_sec_reserved > sec_reserved || sec_reserved == 0);
s = secondary_cache_->SetCapacity(sec_capacity);
if (s.ok()) {
s = pri_cache_res_->UpdateCacheReservation(

View File

@ -13,6 +13,7 @@
#include <cmath>
#include "rocksdb/secondary_cache.h"
#include "util/file_checksum_helper.h"
#include "util/xxhash.h"
@ -21,6 +22,8 @@ ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr;
// If non-null, injects read error at a rate specified by the
// read_fault_one_in or write_fault_one_in flag
std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache> compressed_secondary_cache;
std::shared_ptr<ROCKSDB_NAMESPACE::Cache> block_cache;
enum ROCKSDB_NAMESPACE::CompressionType compression_type_e =
ROCKSDB_NAMESPACE::kSnappyCompression;
enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e =
@ -148,6 +151,88 @@ void DbVerificationThread(void* v) {
}
}
void CompressedCacheSetCapacityThread(void* v) {
assert(FLAGS_compressed_secondary_cache_size > 0 ||
FLAGS_compressed_secondary_cache_ratio > 0.0);
auto* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared;
while (true) {
{
MutexLock l(shared->GetMutex());
if (shared->ShouldStopBgThread()) {
shared->IncBgThreadsFinished();
if (shared->BgThreadsFinished()) {
shared->GetCondVar()->SignalAll();
}
return;
}
}
db_stress_env->SleepForMicroseconds(FLAGS_secondary_cache_update_interval);
if (FLAGS_compressed_secondary_cache_size > 0) {
Status s = compressed_secondary_cache->SetCapacity(0);
size_t capacity;
if (s.ok()) {
s = compressed_secondary_cache->GetCapacity(capacity);
assert(capacity == 0);
}
db_stress_env->SleepForMicroseconds(10 * 1000 * 1000);
if (s.ok()) {
s = compressed_secondary_cache->SetCapacity(
FLAGS_compressed_secondary_cache_size);
}
if (s.ok()) {
s = compressed_secondary_cache->GetCapacity(capacity);
assert(capacity == FLAGS_compressed_secondary_cache_size);
}
if (!s.ok()) {
fprintf(stderr, "Compressed cache Set/GetCapacity returned error: %s\n",
s.ToString().c_str());
}
} else if (FLAGS_compressed_secondary_cache_ratio > 0.0) {
if (thread->rand.OneIn(2)) {
size_t capacity = block_cache->GetCapacity();
size_t adjustment;
if (FLAGS_use_write_buffer_manager && FLAGS_db_write_buffer_size > 0) {
adjustment = (capacity - FLAGS_db_write_buffer_size);
} else {
adjustment = capacity;
}
// Lower by upto 50% of usable block cache capacity
adjustment = (adjustment * thread->rand.Uniform(50)) / 100;
block_cache->SetCapacity(capacity - adjustment);
fprintf(stderr, "New cache capacity = %lu\n",
block_cache->GetCapacity());
db_stress_env->SleepForMicroseconds(10 * 1000 * 1000);
block_cache->SetCapacity(capacity);
} else {
Status s;
double new_comp_cache_ratio =
(double)thread->rand.Uniform(
FLAGS_compressed_secondary_cache_ratio * 100) /
100;
if (new_comp_cache_ratio == 0.0) {
new_comp_cache_ratio = 0.05;
}
fprintf(stderr, "New comp cache ratio = %f\n", new_comp_cache_ratio);
s = UpdateTieredCache(block_cache, /*capacity*/ -1,
new_comp_cache_ratio);
if (s.ok()) {
db_stress_env->SleepForMicroseconds(10 * 1000 * 1000);
}
if (s.ok()) {
s = UpdateTieredCache(block_cache, /*capacity*/ -1,
FLAGS_compressed_secondary_cache_ratio);
}
if (!s.ok()) {
fprintf(stderr, "UpdateTieredCache returned error: %s\n",
s.ToString().c_str());
}
}
}
}
}
void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) {
if (!FLAGS_verbose) {
return;

View File

@ -108,11 +108,14 @@ DECLARE_int32(max_write_buffer_number);
DECLARE_int32(min_write_buffer_number_to_merge);
DECLARE_int32(max_write_buffer_number_to_maintain);
DECLARE_int64(max_write_buffer_size_to_maintain);
DECLARE_bool(use_write_buffer_manager);
DECLARE_double(memtable_prefix_bloom_size_ratio);
DECLARE_bool(memtable_whole_key_filtering);
DECLARE_int32(open_files);
DECLARE_int64(compressed_cache_size);
DECLARE_int32(compressed_cache_numshardbits);
DECLARE_uint64(compressed_secondary_cache_size);
DECLARE_int32(compressed_secondary_cache_numshardbits);
DECLARE_int32(secondary_cache_update_interval);
DECLARE_double(compressed_secondary_cache_ratio);
DECLARE_int32(compaction_style);
DECLARE_int32(compaction_pri);
DECLARE_int32(num_levels);
@ -358,6 +361,9 @@ constexpr int kValueMaxLen = 100;
extern ROCKSDB_NAMESPACE::Env* db_stress_env;
extern ROCKSDB_NAMESPACE::Env* db_stress_listener_env;
extern std::shared_ptr<ROCKSDB_NAMESPACE::FaultInjectionTestFS> fault_fs_guard;
extern std::shared_ptr<ROCKSDB_NAMESPACE::SecondaryCache>
compressed_secondary_cache;
extern std::shared_ptr<ROCKSDB_NAMESPACE::Cache> block_cache;
extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e;
extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e;
@ -650,6 +656,8 @@ extern void PoolSizeChangeThread(void* v);
extern void DbVerificationThread(void* v);
extern void CompressedCacheSetCapacityThread(void* v);
extern void TimestampedSnapshotsThread(void* v);
extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz);

View File

@ -121,6 +121,11 @@ bool RunStressTestImpl(SharedState* shared) {
shared->IncBgThreads();
}
if (FLAGS_compressed_secondary_cache_size > 0 ||
FLAGS_compressed_secondary_cache_ratio > 0.0) {
shared->IncBgThreads();
}
std::vector<ThreadState*> threads(n);
for (uint32_t i = 0; i < n; i++) {
threads[i] = new ThreadState(i, shared);
@ -138,6 +143,13 @@ bool RunStressTestImpl(SharedState* shared) {
&continuous_verification_thread);
}
ThreadState compressed_cache_set_capacity_thread(0, shared);
if (FLAGS_compressed_secondary_cache_size > 0 ||
FLAGS_compressed_secondary_cache_ratio > 0.0) {
db_stress_env->StartThread(CompressedCacheSetCapacityThread,
&compressed_cache_set_capacity_thread);
}
// Each thread goes through the following states:
// initializing -> wait for others to init -> read/populate/depopulate
// wait for others to operate -> verify -> done
@ -230,7 +242,9 @@ bool RunStressTestImpl(SharedState* shared) {
}
if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
FLAGS_continuous_verification_interval > 0) {
FLAGS_continuous_verification_interval > 0 ||
FLAGS_compressed_secondary_cache_size > 0 ||
FLAGS_compressed_secondary_cache_ratio > 0.0) {
MutexLock l(shared->GetMutex());
shared->SetShouldStopBgThread();
while (!shared->BgThreadsFinished()) {

View File

@ -136,6 +136,9 @@ DEFINE_uint64(db_write_buffer_size,
ROCKSDB_NAMESPACE::Options().db_write_buffer_size,
"Number of bytes to buffer in all memtables before compacting");
DEFINE_bool(use_write_buffer_manager, false,
"Charge WriteBufferManager memory to the block cache");
DEFINE_int32(
write_buffer_size,
static_cast<int32_t>(ROCKSDB_NAMESPACE::Options().write_buffer_size),
@ -198,15 +201,23 @@ DEFINE_int32(open_files, ROCKSDB_NAMESPACE::Options().max_open_files,
"Maximum number of files to keep open at the same time "
"(use default if == 0)");
DEFINE_int64(compressed_cache_size, 0,
"Number of bytes to use as a cache of compressed data."
" 0 means use default settings.");
DEFINE_uint64(compressed_secondary_cache_size, 0,
"Number of bytes to use as a cache of compressed data."
" 0 means use default settings.");
DEFINE_int32(
compressed_cache_numshardbits, -1,
"Number of shards for the compressed block cache is 2 ** "
"compressed_cache_numshardbits. Negative value means default settings. "
"This is applied only if compressed_cache_size is greater than 0.");
DEFINE_int32(compressed_secondary_cache_numshardbits, -1,
"Number of shards for the compressed secondary cache is 2 ** "
"compressed_secondary_cache_numshardbits. "
"Negative value means default settings. This is applied only "
"if compressed_secondary_cache_size is greater than 0.");
DEFINE_double(compressed_secondary_cache_ratio, 0.0,
"Fraction of block cache memory budget to use for compressed "
"secondary cache");
DEFINE_int32(secondary_cache_update_interval, 30 * 1000 * 1000,
"Interval between modification of secondary cache parameters, in "
"microseconds");
DEFINE_int32(compaction_style, ROCKSDB_NAMESPACE::Options().compaction_style,
"");
@ -1023,6 +1034,9 @@ DEFINE_string(secondary_cache_uri, "",
DEFINE_int32(secondary_cache_fault_one_in, 0,
"On non-zero, enables fault injection in secondary cache inserts"
" and lookups");
DEFINE_double(tiered_cache_percent_compressed, 0.0,
"Percentage of total block cache budget to allocate to the "
"compressed cache");
DEFINE_int32(open_write_fault_one_in, 0,
"On non-zero, enables fault injection on file writes "
"during DB reopen.");

View File

@ -112,6 +112,11 @@ std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
std::shared_ptr<SecondaryCache> secondary_cache;
if (!FLAGS_secondary_cache_uri.empty()) {
assert(!strstr(FLAGS_secondary_cache_uri.c_str(),
"compressed_secondary_cache") ||
(FLAGS_compressed_secondary_cache_size == 0 &&
FLAGS_compressed_secondary_cache_ratio == 0.0 &&
!StartsWith(FLAGS_cache_type, "tiered_")));
Status s = SecondaryCache::CreateFromString(
config_options, FLAGS_secondary_cache_uri, &secondary_cache);
if (secondary_cache == nullptr) {
@ -125,36 +130,81 @@ std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
secondary_cache, static_cast<uint32_t>(FLAGS_seed),
FLAGS_secondary_cache_fault_one_in);
}
} else if (FLAGS_compressed_secondary_cache_size > 0) {
if (StartsWith(FLAGS_cache_type, "tiered_")) {
fprintf(stderr,
"Cannot specify both compressed_secondary_cache_size and %s\n",
FLAGS_cache_type.c_str());
exit(1);
}
CompressedSecondaryCacheOptions opts;
opts.capacity = FLAGS_compressed_secondary_cache_size;
secondary_cache = NewCompressedSecondaryCache(opts);
if (secondary_cache == nullptr) {
fprintf(stderr, "Failed to allocate compressed secondary cache\n");
exit(1);
}
compressed_secondary_cache = secondary_cache;
}
if (FLAGS_cache_type == "clock_cache") {
std::string cache_type = FLAGS_cache_type;
size_t cache_size = FLAGS_cache_size;
bool tiered = false;
if (StartsWith(cache_type, "tiered_")) {
tiered = true;
cache_type.erase(0, strlen("tiered_"));
}
if (FLAGS_use_write_buffer_manager) {
cache_size += FLAGS_db_write_buffer_size;
}
if (cache_type == "clock_cache") {
fprintf(stderr, "Old clock cache implementation has been removed.\n");
exit(1);
} else if (EndsWith(FLAGS_cache_type, "hyper_clock_cache")) {
} else if (EndsWith(cache_type, "hyper_clock_cache")) {
size_t estimated_entry_charge;
if (FLAGS_cache_type == "fixed_hyper_clock_cache" ||
FLAGS_cache_type == "hyper_clock_cache") {
if (cache_type == "fixed_hyper_clock_cache" ||
cache_type == "hyper_clock_cache") {
estimated_entry_charge = FLAGS_block_size;
} else if (FLAGS_cache_type == "auto_hyper_clock_cache") {
} else if (cache_type == "auto_hyper_clock_cache") {
estimated_entry_charge = 0;
} else {
fprintf(stderr, "Cache type not supported.");
exit(1);
}
HyperClockCacheOptions opts(FLAGS_cache_size, estimated_entry_charge,
HyperClockCacheOptions opts(cache_size, estimated_entry_charge,
num_shard_bits);
opts.hash_seed = BitwiseAnd(FLAGS_seed, INT32_MAX);
return opts.MakeSharedCache();
} else if (FLAGS_cache_type == "lru_cache") {
if (tiered) {
TieredCacheOptions tiered_opts;
tiered_opts.cache_opts = &opts;
tiered_opts.cache_type = PrimaryCacheType::kCacheTypeHCC;
tiered_opts.total_capacity = cache_size;
tiered_opts.compressed_secondary_ratio = 0.5;
block_cache = NewTieredCache(tiered_opts);
} else {
opts.secondary_cache = std::move(secondary_cache);
block_cache = opts.MakeSharedCache();
}
} else if (EndsWith(cache_type, "lru_cache")) {
LRUCacheOptions opts;
opts.capacity = capacity;
opts.num_shard_bits = num_shard_bits;
opts.secondary_cache = std::move(secondary_cache);
return NewLRUCache(opts);
if (tiered) {
TieredCacheOptions tiered_opts;
tiered_opts.cache_opts = &opts;
tiered_opts.cache_type = PrimaryCacheType::kCacheTypeLRU;
tiered_opts.total_capacity = cache_size;
tiered_opts.compressed_secondary_ratio = 0.5;
block_cache = NewTieredCache(tiered_opts);
} else {
opts.secondary_cache = std::move(secondary_cache);
block_cache = NewLRUCache(opts);
}
} else {
fprintf(stderr, "Cache type not supported.");
exit(1);
}
return block_cache;
}
std::vector<std::string> StressTest::GetBlobCompressionTags() {
@ -3153,6 +3203,10 @@ void InitializeOptionsFromFlags(
FLAGS_max_write_buffer_size_to_maintain;
options.memtable_prefix_bloom_size_ratio =
FLAGS_memtable_prefix_bloom_size_ratio;
if (FLAGS_use_write_buffer_manager) {
options.write_buffer_manager.reset(
new WriteBufferManager(FLAGS_db_write_buffer_size, block_cache));
}
options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering;
options.disable_auto_compactions = FLAGS_disable_auto_compactions;
options.max_background_compactions = FLAGS_max_background_compactions;

View File

@ -43,7 +43,7 @@ default_params = {
[random.randint(0, 19), random.lognormvariate(2.3, 1.3)]
),
"cache_index_and_filter_blocks": lambda: random.randint(0, 1),
"cache_size": 8388608,
"cache_size": lambda: random.choice([8388608, 33554432]),
"charge_compression_dictionary_building_buffer": lambda: random.choice([0, 1]),
"charge_filter_construction": lambda: random.choice([0, 1]),
"charge_table_reader": lambda: random.choice([0, 1]),
@ -126,7 +126,9 @@ default_params = {
"mock_direct_io": False,
"cache_type": lambda: random.choice(
["lru_cache", "fixed_hyper_clock_cache", "auto_hyper_clock_cache",
"auto_hyper_clock_cache"]
"auto_hyper_clock_cache", "tiered_lru_cache",
"tiered_fixed_hyper_clock_cache", "tiered_auto_hyper_clock_cache",
"tiered_auto_hyper_clock_cache"]
),
"use_full_merge_v1": lambda: random.randint(0, 1),
"use_merge": lambda: random.randint(0, 1),
@ -163,6 +165,7 @@ default_params = {
"db_write_buffer_size": lambda: random.choice(
[0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024]
),
"use_write_buffer_manager": lambda: random.randint(0,1),
"avoid_unnecessary_blocking_io": random.randint(0, 1),
"write_dbid_to_manifest": random.randint(0, 1),
"avoid_flush_during_recovery": lambda: random.choice(
@ -191,6 +194,7 @@ default_params = {
),
"user_timestamp_size": 0,
"secondary_cache_fault_one_in": lambda: random.choice([0, 0, 32]),
"compressed_secondary_cache_size": lambda: random.choice([8388608, 16777216]),
"prepopulate_block_cache": lambda: random.choice([0, 1]),
"memtable_prefix_bloom_size_ratio": lambda: random.choice([0.001, 0.01, 0.1, 0.5]),
"memtable_whole_key_filtering": lambda: random.randint(0, 1),
@ -202,7 +206,8 @@ default_params = {
"secondary_cache_uri": lambda: random.choice(
[
"",
"compressed_secondary_cache://capacity=8388608",
"",
"",
"compressed_secondary_cache://capacity=8388608;enable_custom_split_merge=true",
]
),
@ -681,6 +686,22 @@ def finalize_and_sanitize(src_params):
if dest_params["write_fault_one_in"] > 0:
# background work may be disabled while DB is resuming after some error
dest_params["max_write_buffer_number"] = max(dest_params["max_write_buffer_number"], 10)
if dest_params["secondary_cache_uri"].find("compressed_secondary_cache") >= 0:
dest_params["compressed_secondary_cache_size"] = 0
dest_params["compressed_secondary_cache_ratio"] = 0.0
if dest_params["cache_type"].find("tiered_") >= 0:
if dest_params["compressed_secondary_cache_size"] > 0:
dest_params["compressed_secondary_cache_ratio"] = \
float(dest_params["compressed_secondary_cache_size"]/ \
(dest_params["cache_size"] + dest_params["compressed_secondary_cache_size"]))
dest_params["compressed_secondary_cache_size"] = 0
else:
dest_params["compressed_secondary_cache_ratio"] = 0.0
dest_params["cache_type"] = dest_params["cache_type"].replace("tiered_", "")
if dest_params["use_write_buffer_manager"]:
if (dest_params["cache_size"] <= 0
or dest_params["db_write_buffer_size"] <= 0):
dest_params["use_write_buffer_manager"] = 0
return dest_params