diff --git a/cache/secondary_cache_adapter.cc b/cache/secondary_cache_adapter.cc index d6b347246a..b378197ccf 100644 --- a/cache/secondary_cache_adapter.cc +++ b/cache/secondary_cache_adapter.cc @@ -507,7 +507,6 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio( return s; } - assert(old_sec_capacity >= pri_cache_res_->GetTotalMemoryUsed()); size_t old_sec_reserved = old_sec_capacity - pri_cache_res_->GetTotalMemoryUsed(); // Calculate the new secondary cache reservation @@ -527,7 +526,6 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio( // cache utilization (increase in capacity - increase in share of cache // reservation) // 3. Increase secondary cache capacity - assert(sec_reserved > old_sec_reserved || sec_reserved == 0); s = secondary_cache_->Deflate(sec_reserved - old_sec_reserved); assert(s.ok()); s = pri_cache_res_->UpdateCacheReservation( @@ -544,7 +542,6 @@ Status CacheWithSecondaryAdapter::UpdateCacheReservationRatio( // reservations) // 3. Inflate the secondary cache to give it back the reduction in its // share of cache reservations - assert(old_sec_reserved > sec_reserved || sec_reserved == 0); s = secondary_cache_->SetCapacity(sec_capacity); if (s.ok()) { s = pri_cache_res_->UpdateCacheReservation( diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index 6a59866655..c7a8ef0a2e 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -13,6 +13,7 @@ #include +#include "rocksdb/secondary_cache.h" #include "util/file_checksum_helper.h" #include "util/xxhash.h" @@ -21,6 +22,8 @@ ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr; // If non-null, injects read error at a rate specified by the // read_fault_one_in or write_fault_one_in flag std::shared_ptr fault_fs_guard; +std::shared_ptr compressed_secondary_cache; +std::shared_ptr block_cache; enum ROCKSDB_NAMESPACE::CompressionType compression_type_e = ROCKSDB_NAMESPACE::kSnappyCompression; enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e = @@ -148,6 +151,88 @@ void DbVerificationThread(void* v) { } } +void CompressedCacheSetCapacityThread(void* v) { + assert(FLAGS_compressed_secondary_cache_size > 0 || + FLAGS_compressed_secondary_cache_ratio > 0.0); + auto* thread = reinterpret_cast(v); + SharedState* shared = thread->shared; + while (true) { + { + MutexLock l(shared->GetMutex()); + if (shared->ShouldStopBgThread()) { + shared->IncBgThreadsFinished(); + if (shared->BgThreadsFinished()) { + shared->GetCondVar()->SignalAll(); + } + return; + } + } + db_stress_env->SleepForMicroseconds(FLAGS_secondary_cache_update_interval); + if (FLAGS_compressed_secondary_cache_size > 0) { + Status s = compressed_secondary_cache->SetCapacity(0); + size_t capacity; + if (s.ok()) { + s = compressed_secondary_cache->GetCapacity(capacity); + assert(capacity == 0); + } + db_stress_env->SleepForMicroseconds(10 * 1000 * 1000); + if (s.ok()) { + s = compressed_secondary_cache->SetCapacity( + FLAGS_compressed_secondary_cache_size); + } + if (s.ok()) { + s = compressed_secondary_cache->GetCapacity(capacity); + assert(capacity == FLAGS_compressed_secondary_cache_size); + } + if (!s.ok()) { + fprintf(stderr, "Compressed cache Set/GetCapacity returned error: %s\n", + s.ToString().c_str()); + } + } else if (FLAGS_compressed_secondary_cache_ratio > 0.0) { + if (thread->rand.OneIn(2)) { + size_t capacity = block_cache->GetCapacity(); + size_t adjustment; + if (FLAGS_use_write_buffer_manager && FLAGS_db_write_buffer_size > 0) { + adjustment = (capacity - FLAGS_db_write_buffer_size); + } else { + adjustment = capacity; + } + // Lower by upto 50% of usable block cache capacity + adjustment = (adjustment * thread->rand.Uniform(50)) / 100; + block_cache->SetCapacity(capacity - adjustment); + fprintf(stderr, "New cache capacity = %lu\n", + block_cache->GetCapacity()); + db_stress_env->SleepForMicroseconds(10 * 1000 * 1000); + block_cache->SetCapacity(capacity); + } else { + Status s; + double new_comp_cache_ratio = + (double)thread->rand.Uniform( + FLAGS_compressed_secondary_cache_ratio * 100) / + 100; + if (new_comp_cache_ratio == 0.0) { + new_comp_cache_ratio = 0.05; + } + fprintf(stderr, "New comp cache ratio = %f\n", new_comp_cache_ratio); + + s = UpdateTieredCache(block_cache, /*capacity*/ -1, + new_comp_cache_ratio); + if (s.ok()) { + db_stress_env->SleepForMicroseconds(10 * 1000 * 1000); + } + if (s.ok()) { + s = UpdateTieredCache(block_cache, /*capacity*/ -1, + FLAGS_compressed_secondary_cache_ratio); + } + if (!s.ok()) { + fprintf(stderr, "UpdateTieredCache returned error: %s\n", + s.ToString().c_str()); + } + } + } + } +} + void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) { if (!FLAGS_verbose) { return; diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 93b5f32d29..485400e05b 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -108,11 +108,14 @@ DECLARE_int32(max_write_buffer_number); DECLARE_int32(min_write_buffer_number_to_merge); DECLARE_int32(max_write_buffer_number_to_maintain); DECLARE_int64(max_write_buffer_size_to_maintain); +DECLARE_bool(use_write_buffer_manager); DECLARE_double(memtable_prefix_bloom_size_ratio); DECLARE_bool(memtable_whole_key_filtering); DECLARE_int32(open_files); -DECLARE_int64(compressed_cache_size); -DECLARE_int32(compressed_cache_numshardbits); +DECLARE_uint64(compressed_secondary_cache_size); +DECLARE_int32(compressed_secondary_cache_numshardbits); +DECLARE_int32(secondary_cache_update_interval); +DECLARE_double(compressed_secondary_cache_ratio); DECLARE_int32(compaction_style); DECLARE_int32(compaction_pri); DECLARE_int32(num_levels); @@ -358,6 +361,9 @@ constexpr int kValueMaxLen = 100; extern ROCKSDB_NAMESPACE::Env* db_stress_env; extern ROCKSDB_NAMESPACE::Env* db_stress_listener_env; extern std::shared_ptr fault_fs_guard; +extern std::shared_ptr + compressed_secondary_cache; +extern std::shared_ptr block_cache; extern enum ROCKSDB_NAMESPACE::CompressionType compression_type_e; extern enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e; @@ -650,6 +656,8 @@ extern void PoolSizeChangeThread(void* v); extern void DbVerificationThread(void* v); +extern void CompressedCacheSetCapacityThread(void* v); + extern void TimestampedSnapshotsThread(void* v); extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz); diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index 2ab0b0d715..92730beca2 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -121,6 +121,11 @@ bool RunStressTestImpl(SharedState* shared) { shared->IncBgThreads(); } + if (FLAGS_compressed_secondary_cache_size > 0 || + FLAGS_compressed_secondary_cache_ratio > 0.0) { + shared->IncBgThreads(); + } + std::vector threads(n); for (uint32_t i = 0; i < n; i++) { threads[i] = new ThreadState(i, shared); @@ -138,6 +143,13 @@ bool RunStressTestImpl(SharedState* shared) { &continuous_verification_thread); } + ThreadState compressed_cache_set_capacity_thread(0, shared); + if (FLAGS_compressed_secondary_cache_size > 0 || + FLAGS_compressed_secondary_cache_ratio > 0.0) { + db_stress_env->StartThread(CompressedCacheSetCapacityThread, + &compressed_cache_set_capacity_thread); + } + // Each thread goes through the following states: // initializing -> wait for others to init -> read/populate/depopulate // wait for others to operate -> verify -> done @@ -230,7 +242,9 @@ bool RunStressTestImpl(SharedState* shared) { } if (FLAGS_compaction_thread_pool_adjust_interval > 0 || - FLAGS_continuous_verification_interval > 0) { + FLAGS_continuous_verification_interval > 0 || + FLAGS_compressed_secondary_cache_size > 0 || + FLAGS_compressed_secondary_cache_ratio > 0.0) { MutexLock l(shared->GetMutex()); shared->SetShouldStopBgThread(); while (!shared->BgThreadsFinished()) { diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index cdea77c193..cd1c978b81 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -136,6 +136,9 @@ DEFINE_uint64(db_write_buffer_size, ROCKSDB_NAMESPACE::Options().db_write_buffer_size, "Number of bytes to buffer in all memtables before compacting"); +DEFINE_bool(use_write_buffer_manager, false, + "Charge WriteBufferManager memory to the block cache"); + DEFINE_int32( write_buffer_size, static_cast(ROCKSDB_NAMESPACE::Options().write_buffer_size), @@ -198,15 +201,23 @@ DEFINE_int32(open_files, ROCKSDB_NAMESPACE::Options().max_open_files, "Maximum number of files to keep open at the same time " "(use default if == 0)"); -DEFINE_int64(compressed_cache_size, 0, - "Number of bytes to use as a cache of compressed data." - " 0 means use default settings."); +DEFINE_uint64(compressed_secondary_cache_size, 0, + "Number of bytes to use as a cache of compressed data." + " 0 means use default settings."); -DEFINE_int32( - compressed_cache_numshardbits, -1, - "Number of shards for the compressed block cache is 2 ** " - "compressed_cache_numshardbits. Negative value means default settings. " - "This is applied only if compressed_cache_size is greater than 0."); +DEFINE_int32(compressed_secondary_cache_numshardbits, -1, + "Number of shards for the compressed secondary cache is 2 ** " + "compressed_secondary_cache_numshardbits. " + "Negative value means default settings. This is applied only " + "if compressed_secondary_cache_size is greater than 0."); + +DEFINE_double(compressed_secondary_cache_ratio, 0.0, + "Fraction of block cache memory budget to use for compressed " + "secondary cache"); + +DEFINE_int32(secondary_cache_update_interval, 30 * 1000 * 1000, + "Interval between modification of secondary cache parameters, in " + "microseconds"); DEFINE_int32(compaction_style, ROCKSDB_NAMESPACE::Options().compaction_style, ""); @@ -1023,6 +1034,9 @@ DEFINE_string(secondary_cache_uri, "", DEFINE_int32(secondary_cache_fault_one_in, 0, "On non-zero, enables fault injection in secondary cache inserts" " and lookups"); +DEFINE_double(tiered_cache_percent_compressed, 0.0, + "Percentage of total block cache budget to allocate to the " + "compressed cache"); DEFINE_int32(open_write_fault_one_in, 0, "On non-zero, enables fault injection on file writes " "during DB reopen."); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 5b843eb5df..02933e00ca 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -112,6 +112,11 @@ std::shared_ptr StressTest::NewCache(size_t capacity, std::shared_ptr secondary_cache; if (!FLAGS_secondary_cache_uri.empty()) { + assert(!strstr(FLAGS_secondary_cache_uri.c_str(), + "compressed_secondary_cache") || + (FLAGS_compressed_secondary_cache_size == 0 && + FLAGS_compressed_secondary_cache_ratio == 0.0 && + !StartsWith(FLAGS_cache_type, "tiered_"))); Status s = SecondaryCache::CreateFromString( config_options, FLAGS_secondary_cache_uri, &secondary_cache); if (secondary_cache == nullptr) { @@ -125,36 +130,81 @@ std::shared_ptr StressTest::NewCache(size_t capacity, secondary_cache, static_cast(FLAGS_seed), FLAGS_secondary_cache_fault_one_in); } + } else if (FLAGS_compressed_secondary_cache_size > 0) { + if (StartsWith(FLAGS_cache_type, "tiered_")) { + fprintf(stderr, + "Cannot specify both compressed_secondary_cache_size and %s\n", + FLAGS_cache_type.c_str()); + exit(1); + } + CompressedSecondaryCacheOptions opts; + opts.capacity = FLAGS_compressed_secondary_cache_size; + secondary_cache = NewCompressedSecondaryCache(opts); + if (secondary_cache == nullptr) { + fprintf(stderr, "Failed to allocate compressed secondary cache\n"); + exit(1); + } + compressed_secondary_cache = secondary_cache; } - if (FLAGS_cache_type == "clock_cache") { + std::string cache_type = FLAGS_cache_type; + size_t cache_size = FLAGS_cache_size; + bool tiered = false; + if (StartsWith(cache_type, "tiered_")) { + tiered = true; + cache_type.erase(0, strlen("tiered_")); + } + if (FLAGS_use_write_buffer_manager) { + cache_size += FLAGS_db_write_buffer_size; + } + if (cache_type == "clock_cache") { fprintf(stderr, "Old clock cache implementation has been removed.\n"); exit(1); - } else if (EndsWith(FLAGS_cache_type, "hyper_clock_cache")) { + } else if (EndsWith(cache_type, "hyper_clock_cache")) { size_t estimated_entry_charge; - if (FLAGS_cache_type == "fixed_hyper_clock_cache" || - FLAGS_cache_type == "hyper_clock_cache") { + if (cache_type == "fixed_hyper_clock_cache" || + cache_type == "hyper_clock_cache") { estimated_entry_charge = FLAGS_block_size; - } else if (FLAGS_cache_type == "auto_hyper_clock_cache") { + } else if (cache_type == "auto_hyper_clock_cache") { estimated_entry_charge = 0; } else { fprintf(stderr, "Cache type not supported."); exit(1); } - HyperClockCacheOptions opts(FLAGS_cache_size, estimated_entry_charge, + HyperClockCacheOptions opts(cache_size, estimated_entry_charge, num_shard_bits); opts.hash_seed = BitwiseAnd(FLAGS_seed, INT32_MAX); - return opts.MakeSharedCache(); - } else if (FLAGS_cache_type == "lru_cache") { + if (tiered) { + TieredCacheOptions tiered_opts; + tiered_opts.cache_opts = &opts; + tiered_opts.cache_type = PrimaryCacheType::kCacheTypeHCC; + tiered_opts.total_capacity = cache_size; + tiered_opts.compressed_secondary_ratio = 0.5; + block_cache = NewTieredCache(tiered_opts); + } else { + opts.secondary_cache = std::move(secondary_cache); + block_cache = opts.MakeSharedCache(); + } + } else if (EndsWith(cache_type, "lru_cache")) { LRUCacheOptions opts; opts.capacity = capacity; opts.num_shard_bits = num_shard_bits; - opts.secondary_cache = std::move(secondary_cache); - return NewLRUCache(opts); + if (tiered) { + TieredCacheOptions tiered_opts; + tiered_opts.cache_opts = &opts; + tiered_opts.cache_type = PrimaryCacheType::kCacheTypeLRU; + tiered_opts.total_capacity = cache_size; + tiered_opts.compressed_secondary_ratio = 0.5; + block_cache = NewTieredCache(tiered_opts); + } else { + opts.secondary_cache = std::move(secondary_cache); + block_cache = NewLRUCache(opts); + } } else { fprintf(stderr, "Cache type not supported."); exit(1); } + return block_cache; } std::vector StressTest::GetBlobCompressionTags() { @@ -3153,6 +3203,10 @@ void InitializeOptionsFromFlags( FLAGS_max_write_buffer_size_to_maintain; options.memtable_prefix_bloom_size_ratio = FLAGS_memtable_prefix_bloom_size_ratio; + if (FLAGS_use_write_buffer_manager) { + options.write_buffer_manager.reset( + new WriteBufferManager(FLAGS_db_write_buffer_size, block_cache)); + } options.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering; options.disable_auto_compactions = FLAGS_disable_auto_compactions; options.max_background_compactions = FLAGS_max_background_compactions; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 7b2ce96a1d..15d974ab85 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -43,7 +43,7 @@ default_params = { [random.randint(0, 19), random.lognormvariate(2.3, 1.3)] ), "cache_index_and_filter_blocks": lambda: random.randint(0, 1), - "cache_size": 8388608, + "cache_size": lambda: random.choice([8388608, 33554432]), "charge_compression_dictionary_building_buffer": lambda: random.choice([0, 1]), "charge_filter_construction": lambda: random.choice([0, 1]), "charge_table_reader": lambda: random.choice([0, 1]), @@ -126,7 +126,9 @@ default_params = { "mock_direct_io": False, "cache_type": lambda: random.choice( ["lru_cache", "fixed_hyper_clock_cache", "auto_hyper_clock_cache", - "auto_hyper_clock_cache"] + "auto_hyper_clock_cache", "tiered_lru_cache", + "tiered_fixed_hyper_clock_cache", "tiered_auto_hyper_clock_cache", + "tiered_auto_hyper_clock_cache"] ), "use_full_merge_v1": lambda: random.randint(0, 1), "use_merge": lambda: random.randint(0, 1), @@ -163,6 +165,7 @@ default_params = { "db_write_buffer_size": lambda: random.choice( [0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024] ), + "use_write_buffer_manager": lambda: random.randint(0,1), "avoid_unnecessary_blocking_io": random.randint(0, 1), "write_dbid_to_manifest": random.randint(0, 1), "avoid_flush_during_recovery": lambda: random.choice( @@ -191,6 +194,7 @@ default_params = { ), "user_timestamp_size": 0, "secondary_cache_fault_one_in": lambda: random.choice([0, 0, 32]), + "compressed_secondary_cache_size": lambda: random.choice([8388608, 16777216]), "prepopulate_block_cache": lambda: random.choice([0, 1]), "memtable_prefix_bloom_size_ratio": lambda: random.choice([0.001, 0.01, 0.1, 0.5]), "memtable_whole_key_filtering": lambda: random.randint(0, 1), @@ -202,7 +206,8 @@ default_params = { "secondary_cache_uri": lambda: random.choice( [ "", - "compressed_secondary_cache://capacity=8388608", + "", + "", "compressed_secondary_cache://capacity=8388608;enable_custom_split_merge=true", ] ), @@ -681,6 +686,22 @@ def finalize_and_sanitize(src_params): if dest_params["write_fault_one_in"] > 0: # background work may be disabled while DB is resuming after some error dest_params["max_write_buffer_number"] = max(dest_params["max_write_buffer_number"], 10) + if dest_params["secondary_cache_uri"].find("compressed_secondary_cache") >= 0: + dest_params["compressed_secondary_cache_size"] = 0 + dest_params["compressed_secondary_cache_ratio"] = 0.0 + if dest_params["cache_type"].find("tiered_") >= 0: + if dest_params["compressed_secondary_cache_size"] > 0: + dest_params["compressed_secondary_cache_ratio"] = \ + float(dest_params["compressed_secondary_cache_size"]/ \ + (dest_params["cache_size"] + dest_params["compressed_secondary_cache_size"])) + dest_params["compressed_secondary_cache_size"] = 0 + else: + dest_params["compressed_secondary_cache_ratio"] = 0.0 + dest_params["cache_type"] = dest_params["cache_type"].replace("tiered_", "") + if dest_params["use_write_buffer_manager"]: + if (dest_params["cache_size"] <= 0 + or dest_params["db_write_buffer_size"] <= 0): + dest_params["use_write_buffer_manager"] = 0 return dest_params