// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. // #ifdef GFLAGS #include "db_stress_tool/db_stress_common.h" #include #include "rocksdb/secondary_cache.h" #include "util/file_checksum_helper.h" #include "util/xxhash.h" ROCKSDB_NAMESPACE::Env* db_stress_listener_env = nullptr; ROCKSDB_NAMESPACE::Env* db_stress_env = nullptr; // If non-null, injects read error at a rate specified by the // read_fault_one_in or write_fault_one_in flag std::shared_ptr fault_fs_guard; std::shared_ptr compressed_secondary_cache; std::shared_ptr block_cache; enum ROCKSDB_NAMESPACE::CompressionType compression_type_e = ROCKSDB_NAMESPACE::kSnappyCompression; enum ROCKSDB_NAMESPACE::CompressionType bottommost_compression_type_e = ROCKSDB_NAMESPACE::kSnappyCompression; enum ROCKSDB_NAMESPACE::ChecksumType checksum_type_e = ROCKSDB_NAMESPACE::kCRC32c; enum RepFactory FLAGS_rep_factory = kSkipList; std::vector sum_probs(100001); constexpr int64_t zipf_sum_size = 100000; namespace ROCKSDB_NAMESPACE { // Zipfian distribution is generated based on a pre-calculated array. // It should be used before start the stress test. // First, the probability distribution function (PDF) of this Zipfian follows // power low. P(x) = 1/(x^alpha). // So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop // and add the PDF value togetger as c. So we get the total probability in c. // Next, we calculate inverse CDF of Zipfian and store the value of each in // an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for // integer k, its Zipfian CDF value is sum_probs[k]. // Third, when we need to get an integer whose probability follows Zipfian // distribution, we use a rand_seed [0,1] which follows uniform distribution // as a seed and search it in the sum_probs via binary search. When we find // the closest sum_probs[i] of rand_seed, i is the integer that in // [0, zipf_sum_size] following Zipfian distribution with parameter alpha. // Finally, we can scale i to [0, max_key] scale. // In order to avoid that hot keys are close to each other and skew towards 0, // we use Rando64 to shuffle it. void InitializeHotKeyGenerator(double alpha) { double c = 0; for (int64_t i = 1; i <= zipf_sum_size; i++) { c = c + (1.0 / std::pow(static_cast(i), alpha)); } c = 1.0 / c; sum_probs[0] = 0; for (int64_t i = 1; i <= zipf_sum_size; i++) { sum_probs[i] = sum_probs[i - 1] + c / std::pow(static_cast(i), alpha); } } // Generate one key that follows the Zipfian distribution. The skewness // is decided by the parameter alpha. Input is the rand_seed [0,1] and // the max of the key to be generated. If we directly return tmp_zipf_seed, // the closer to 0, the higher probability will be. To randomly distribute // the hot keys in [0, max_key], we use Random64 to shuffle it. int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) { int64_t low = 1, mid, high = zipf_sum_size, zipf = 0; while (low <= high) { mid = (low + high) / 2; if (sum_probs[mid] >= rand_seed && sum_probs[mid - 1] < rand_seed) { zipf = mid; break; } else if (sum_probs[mid] >= rand_seed) { high = mid - 1; } else { low = mid + 1; } } int64_t tmp_zipf_seed = zipf * max_key / zipf_sum_size; Random64 rand_local(tmp_zipf_seed); return rand_local.Next() % max_key; } void PoolSizeChangeThread(void* v) { assert(FLAGS_compaction_thread_pool_adjust_interval > 0); ThreadState* thread = static_cast(v); SharedState* shared = thread->shared; while (true) { { MutexLock l(shared->GetMutex()); if (shared->ShouldStopBgThread()) { shared->IncBgThreadsFinished(); if (shared->BgThreadsFinished()) { shared->GetCondVar()->SignalAll(); } return; } } auto thread_pool_size_base = FLAGS_max_background_compactions; auto thread_pool_size_var = FLAGS_compaction_thread_pool_variations; int new_thread_pool_size = thread_pool_size_base - thread_pool_size_var + thread->rand.Next() % (thread_pool_size_var * 2 + 1); if (new_thread_pool_size < 1) { new_thread_pool_size = 1; } db_stress_env->SetBackgroundThreads(new_thread_pool_size, ROCKSDB_NAMESPACE::Env::Priority::LOW); // Sleep up to 3 seconds db_stress_env->SleepForMicroseconds( thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval * 1000 + 1); } } void DbVerificationThread(void* v) { assert(FLAGS_continuous_verification_interval > 0); auto* thread = static_cast(v); SharedState* shared = thread->shared; StressTest* stress_test = shared->GetStressTest(); assert(stress_test != nullptr); while (true) { { MutexLock l(shared->GetMutex()); if (shared->ShouldStopBgThread()) { shared->IncBgThreadsFinished(); if (shared->BgThreadsFinished()) { shared->GetCondVar()->SignalAll(); } return; } } if (!shared->HasVerificationFailedYet()) { stress_test->ContinuouslyVerifyDb(thread); } db_stress_env->SleepForMicroseconds( thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 + 1); } } void CompressedCacheSetCapacityThread(void* v) { assert(FLAGS_compressed_secondary_cache_size > 0 || FLAGS_compressed_secondary_cache_ratio > 0.0); auto* thread = static_cast(v); SharedState* shared = thread->shared; while (true) { { MutexLock l(shared->GetMutex()); if (shared->ShouldStopBgThread()) { shared->IncBgThreadsFinished(); if (shared->BgThreadsFinished()) { shared->GetCondVar()->SignalAll(); } return; } } db_stress_env->SleepForMicroseconds(FLAGS_secondary_cache_update_interval); if (FLAGS_compressed_secondary_cache_size > 0) { Status s = compressed_secondary_cache->SetCapacity(0); size_t capacity; if (s.ok()) { s = compressed_secondary_cache->GetCapacity(capacity); assert(capacity == 0); } db_stress_env->SleepForMicroseconds(10 * 1000 * 1000); if (s.ok()) { s = compressed_secondary_cache->SetCapacity( FLAGS_compressed_secondary_cache_size); } if (s.ok()) { s = compressed_secondary_cache->GetCapacity(capacity); assert(capacity == FLAGS_compressed_secondary_cache_size); } if (!s.ok()) { fprintf(stderr, "Compressed cache Set/GetCapacity returned error: %s\n", s.ToString().c_str()); } } else if (FLAGS_compressed_secondary_cache_ratio > 0.0) { if (thread->rand.OneIn(2)) { // if (thread->rand.OneIn(2)) { size_t capacity = block_cache->GetCapacity(); size_t adjustment; if (FLAGS_use_write_buffer_manager && FLAGS_db_write_buffer_size > 0) { adjustment = (capacity - FLAGS_db_write_buffer_size); } else { adjustment = capacity; } // Lower by upto 50% of usable block cache capacity adjustment = (adjustment * thread->rand.Uniform(50)) / 100; block_cache->SetCapacity(capacity - adjustment); fprintf(stdout, "New cache capacity = %lu\n", block_cache->GetCapacity()); db_stress_env->SleepForMicroseconds(10 * 1000 * 1000); block_cache->SetCapacity(capacity); } else { Status s; double new_comp_cache_ratio = (double)thread->rand.Uniform( FLAGS_compressed_secondary_cache_ratio * 100) / 100; fprintf(stdout, "New comp cache ratio = %f\n", new_comp_cache_ratio); s = UpdateTieredCache(block_cache, /*capacity*/ -1, new_comp_cache_ratio); if (s.ok()) { db_stress_env->SleepForMicroseconds(10 * 1000 * 1000); } if (s.ok()) { s = UpdateTieredCache(block_cache, /*capacity*/ -1, FLAGS_compressed_secondary_cache_ratio); } if (!s.ok()) { fprintf(stderr, "UpdateTieredCache returned error: %s\n", s.ToString().c_str()); } } } } } void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) { if (!FLAGS_verbose) { return; } std::string tmp; tmp.reserve(sz * 2 + 16); char buf[4]; for (size_t i = 0; i < sz; i++) { snprintf(buf, 4, "%X", value[i]); tmp.append(buf); } auto key_str = Key(key); Slice key_slice = key_str; fprintf(stdout, "[CF %d] %s (%" PRIi64 ") == > (%" ROCKSDB_PRIszt ") %s\n", cf, key_slice.ToString(true).c_str(), key, sz, tmp.c_str()); } // Note that if hot_key_alpha != 0, it generates the key based on Zipfian // distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does // not ensure the order of the keys being generated and the keys does not have // the active range which is related to FLAGS_active_width. int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) { const double completed_ratio = static_cast(iteration) / FLAGS_ops_per_thread; const int64_t base_key = static_cast( completed_ratio * (FLAGS_max_key - FLAGS_active_width)); int64_t rand_seed = base_key + thread->rand.Next() % FLAGS_active_width; int64_t cur_key = rand_seed; if (FLAGS_hot_key_alpha != 0) { // If set the Zipfian distribution Alpha to non 0, use Zipfian double float_rand = (static_cast(thread->rand.Next() % FLAGS_max_key)) / FLAGS_max_key; cur_key = GetOneHotKeyID(float_rand, FLAGS_max_key); } return cur_key; } // Note that if hot_key_alpha != 0, it generates the key based on Zipfian // distribution. Keys being generated are in random order. // If user want to generate keys based on uniform distribution, user needs to // set hot_key_alpha == 0. It will generate the random keys in increasing // order in the key array (ensure key[i] >= key[i+1]) and constrained in a // range related to FLAGS_active_width. std::vector GenerateNKeys(ThreadState* thread, int num_keys, uint64_t iteration) { const double completed_ratio = static_cast(iteration) / FLAGS_ops_per_thread; const int64_t base_key = static_cast( completed_ratio * (FLAGS_max_key - FLAGS_active_width)); std::vector keys; keys.reserve(num_keys); int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width; keys.push_back(next_key); for (int i = 1; i < num_keys; ++i) { // Generate the key follows zipfian distribution if (FLAGS_hot_key_alpha != 0) { double float_rand = (static_cast(thread->rand.Next() % FLAGS_max_key)) / FLAGS_max_key; next_key = GetOneHotKeyID(float_rand, FLAGS_max_key); } else { // This may result in some duplicate keys next_key = next_key + thread->rand.Next() % (FLAGS_active_width - (next_key - base_key)); } keys.push_back(next_key); } return keys; } size_t GenerateValue(uint32_t rand, char* v, size_t max_sz) { size_t value_sz = ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult; assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t)); (void)max_sz; PutUnaligned(reinterpret_cast(v), rand); for (size_t i = sizeof(uint32_t); i < value_sz; i++) { v[i] = (char)(rand ^ i); } v[value_sz] = '\0'; return value_sz; // the size of the value set. } uint32_t GetValueBase(Slice s) { assert(s.size() >= sizeof(uint32_t)); uint32_t res; GetUnaligned(reinterpret_cast(s.data()), &res); return res; } AttributeGroups GenerateAttributeGroups( const std::vector& cfhs, uint32_t value_base, const Slice& slice) { WideColumns columns = GenerateWideColumns(value_base, slice); AttributeGroups attribute_groups; for (auto* cfh : cfhs) { attribute_groups.emplace_back(cfh, columns); } return attribute_groups; } WideColumns GenerateWideColumns(uint32_t value_base, const Slice& slice) { WideColumns columns; constexpr size_t max_columns = 4; const size_t num_columns = (value_base % max_columns) + 1; columns.reserve(num_columns); assert(slice.size() >= num_columns); columns.emplace_back(kDefaultWideColumnName, slice); for (size_t i = 1; i < num_columns; ++i) { const Slice name(slice.data(), i); const Slice value(slice.data() + i, slice.size() - i); columns.emplace_back(name, value); } return columns; } WideColumns GenerateExpectedWideColumns(uint32_t value_base, const Slice& slice) { if (FLAGS_use_put_entity_one_in == 0 || (value_base % FLAGS_use_put_entity_one_in) != 0) { return WideColumns{{kDefaultWideColumnName, slice}}; } WideColumns columns = GenerateWideColumns(value_base, slice); WideColumnsHelper::SortColumns(columns); return columns; } bool VerifyWideColumns(const Slice& value, const WideColumns& columns) { if (value.size() < sizeof(uint32_t)) { return false; } const uint32_t value_base = GetValueBase(value); const WideColumns expected_columns = GenerateExpectedWideColumns(value_base, value); if (columns != expected_columns) { return false; } return true; } bool VerifyWideColumns(const WideColumns& columns) { if (!WideColumnsHelper::HasDefaultColumn(columns)) { return false; } const Slice& value_of_default = WideColumnsHelper::GetDefaultColumn(columns); return VerifyWideColumns(value_of_default, columns); } std::string GetNowNanos() { uint64_t t = db_stress_env->NowNanos(); std::string ret; PutFixed64(&ret, t); return ret; } uint64_t GetWriteUnixTime(ThreadState* thread) { static uint64_t kPreserveSeconds = std::max(FLAGS_preserve_internal_time_seconds, FLAGS_preclude_last_level_data_seconds); static uint64_t kFallbackTime = std::numeric_limits::max(); int64_t write_time = 0; Status s = db_stress_env->GetCurrentTime(&write_time); uint32_t write_time_mode = thread->rand.Uniform(3); if (write_time_mode == 0 || !s.ok()) { return kFallbackTime; } else if (write_time_mode == 1) { uint64_t delta = kPreserveSeconds > 0 ? static_cast(thread->rand.Uniform( static_cast(kPreserveSeconds))) : 0; return static_cast(write_time) - delta; } else { return static_cast(write_time) - kPreserveSeconds; } } namespace { class MyXXH64Checksum : public FileChecksumGenerator { public: explicit MyXXH64Checksum(bool big) : big_(big) { state_ = XXH64_createState(); XXH64_reset(state_, 0); } ~MyXXH64Checksum() override { XXH64_freeState(state_); } void Update(const char* data, size_t n) override { XXH64_update(state_, data, n); } void Finalize() override { assert(str_.empty()); uint64_t digest = XXH64_digest(state_); // Store as little endian raw bytes PutFixed64(&str_, digest); if (big_) { // Throw in some more data for stress testing (448 bits total) PutFixed64(&str_, GetSliceHash64(str_)); PutFixed64(&str_, GetSliceHash64(str_)); PutFixed64(&str_, GetSliceHash64(str_)); PutFixed64(&str_, GetSliceHash64(str_)); PutFixed64(&str_, GetSliceHash64(str_)); PutFixed64(&str_, GetSliceHash64(str_)); } } std::string GetChecksum() const override { assert(!str_.empty()); return str_; } const char* Name() const override { return big_ ? "MyBigChecksum" : "MyXXH64Checksum"; } private: bool big_; XXH64_state_t* state_; std::string str_; }; class DbStressChecksumGenFactory : public FileChecksumGenFactory { std::string default_func_name_; std::unique_ptr CreateFromFuncName( const std::string& func_name) { std::unique_ptr rv; if (func_name == "FileChecksumCrc32c") { rv.reset(new FileChecksumGenCrc32c(FileChecksumGenContext())); } else if (func_name == "MyXXH64Checksum") { rv.reset(new MyXXH64Checksum(false /* big */)); } else if (func_name == "MyBigChecksum") { rv.reset(new MyXXH64Checksum(true /* big */)); } else { // Should be a recognized function when we get here assert(false); } return rv; } public: explicit DbStressChecksumGenFactory(const std::string& default_func_name) : default_func_name_(default_func_name) {} std::unique_ptr CreateFileChecksumGenerator( const FileChecksumGenContext& context) override { if (context.requested_checksum_func_name.empty()) { return CreateFromFuncName(default_func_name_); } else { return CreateFromFuncName(context.requested_checksum_func_name); } } const char* Name() const override { return "FileChecksumGenCrc32cFactory"; } }; } // namespace std::shared_ptr GetFileChecksumImpl( const std::string& name) { // Translate from friendly names to internal names std::string internal_name; if (name == "crc32c") { internal_name = "FileChecksumCrc32c"; } else if (name == "xxh64") { internal_name = "MyXXH64Checksum"; } else if (name == "big") { internal_name = "MyBigChecksum"; } else { assert(name.empty() || name == "none"); return nullptr; } return std::make_shared(internal_name); } Status DeleteFilesInDirectory(const std::string& dirname) { std::vector filenames; Status s = Env::Default()->GetChildren(dirname, &filenames); for (size_t i = 0; s.ok() && i < filenames.size(); ++i) { s = Env::Default()->DeleteFile(dirname + "/" + filenames[i]); } return s; } Status SaveFilesInDirectory(const std::string& src_dirname, const std::string& dst_dirname) { std::vector filenames; Status s = Env::Default()->GetChildren(src_dirname, &filenames); for (size_t i = 0; s.ok() && i < filenames.size(); ++i) { bool is_dir = false; s = Env::Default()->IsDirectory(src_dirname + "/" + filenames[i], &is_dir); if (s.ok()) { if (is_dir) { continue; } s = Env::Default()->LinkFile(src_dirname + "/" + filenames[i], dst_dirname + "/" + filenames[i]); } } return s; } Status InitUnverifiedSubdir(const std::string& dirname) { Status s = Env::Default()->FileExists(dirname); if (s.IsNotFound()) { return Status::OK(); } const std::string kUnverifiedDirname = dirname + "/unverified"; if (s.ok()) { s = Env::Default()->CreateDirIfMissing(kUnverifiedDirname); } if (s.ok()) { // It might already exist with some stale contents. Delete any such // contents. s = DeleteFilesInDirectory(kUnverifiedDirname); } if (s.ok()) { s = SaveFilesInDirectory(dirname, kUnverifiedDirname); } return s; } Status DestroyUnverifiedSubdir(const std::string& dirname) { Status s = Env::Default()->FileExists(dirname); if (s.IsNotFound()) { return Status::OK(); } const std::string kUnverifiedDirname = dirname + "/unverified"; if (s.ok()) { s = Env::Default()->FileExists(kUnverifiedDirname); } if (s.IsNotFound()) { return Status::OK(); } if (s.ok()) { s = DeleteFilesInDirectory(kUnverifiedDirname); } if (s.ok()) { s = Env::Default()->DeleteDir(kUnverifiedDirname); } return s; } } // namespace ROCKSDB_NAMESPACE #endif // GFLAGS