diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 5fc2c34a34..15cbe0afe9 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -396,6 +396,14 @@ DEFINE_int32(checkpoint_one_in, 0, "every N operations on average. 0 indicates CreateCheckpoint() " "is disabled."); +DEFINE_int32(ingest_external_file_one_in, 0, + "If non-zero, then IngestExternalFile() will be called once for " + "every N operations on average. 0 indicates IngestExternalFile() " + "is disabled."); + +DEFINE_int32(ingest_external_file_width, 1000, + "The width of the ingested external files."); + DEFINE_int32(compact_files_one_in, 0, "If non-zero, then CompactFiles() will be called once for every N " "operations on average. 0 indicates CompactFiles() is disabled."); @@ -1901,6 +1909,11 @@ class StressTest { } } + if (FLAGS_ingest_external_file_one_in > 0 && + thread->rand.Uniform(FLAGS_ingest_external_file_one_in) == 0) { + TestIngestExternalFile(thread, {rand_column_family}, {rand_key}, lock); + } + if (FLAGS_acquire_snapshot_one_in > 0 && thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) { auto snapshot = db_->GetSnapshot(); @@ -1998,6 +2011,11 @@ class StressTest { const std::vector& rand_keys, std::unique_ptr& lock) = 0; + virtual void TestIngestExternalFile( + ThreadState* thread, const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& lock) = 0; + // Given a key K, this creates an iterator which scans to K and then // does a random sequence of Next/Prev operations. virtual Status TestIterate(ThreadState* thread, @@ -2777,6 +2795,82 @@ class NonBatchedOpsStressTest : public StressTest { return s; } +#ifdef ROCKSDB_LITE + virtual void TestIngestExternalFile( + ThreadState* /* thread */, + const std::vector& /* rand_column_families */, + const std::vector& /* rand_keys */, + std::unique_ptr& /* lock */) { + assert(false); + fprintf(stderr, + "RocksDB lite does not support " + "TestIngestExternalFile\n"); + std::terminate(); + } +#else + virtual void TestIngestExternalFile( + ThreadState* thread, const std::vector& rand_column_families, + const std::vector& rand_keys, std::unique_ptr& lock) { + const std::string sst_filename = + FLAGS_db + "/." + ToString(thread->tid) + ".sst"; + Status s; + if (FLAGS_env->FileExists(sst_filename).ok()) { + // Maybe we terminated abnormally before, so cleanup to give this file + // ingestion a clean slate + s = FLAGS_env->DeleteFile(sst_filename); + } + + SstFileWriter sst_file_writer(EnvOptions(), options_); + if (s.ok()) { + s = sst_file_writer.Open(sst_filename); + } + int64_t key_base = rand_keys[0]; + int column_family = rand_column_families[0]; + std::vector > range_locks; + std::vector values; + SharedState* shared = thread->shared; + + // Grab locks, set pending state on expected values, and add keys + for (int64_t key = key_base; + s.ok() && key < std::min(key_base + FLAGS_ingest_external_file_width, + shared->GetMaxKey()); + ++key) { + if (key == key_base) { + range_locks.emplace_back(std::move(lock)); + } else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) { + range_locks.emplace_back( + new MutexLock(shared->GetMutexForKey(column_family, key))); + } + + uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; + values.push_back(value_base); + shared->Put(column_family, key, value_base, true /* pending */); + + char value[100]; + size_t value_len = GenerateValue(value_base, value, sizeof(value)); + auto key_str = Key(key); + s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len)); + } + + if (s.ok()) { + s = sst_file_writer.Finish(); + } + if (s.ok()) { + s = db_->IngestExternalFile(column_families_[column_family], + {sst_filename}, IngestExternalFileOptions()); + } + if (!s.ok()) { + fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str()); + std::terminate(); + } + int64_t key = key_base; + for (int64_t value : values) { + shared->Put(column_family, key, value, false /* pending */); + ++key; + } + } +#endif // ROCKSDB_LITE + bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/, SharedState* shared, const std::string& value_from_db, Status s, bool strict = false) const { @@ -2905,6 +2999,18 @@ class BatchedOpsStressTest : public StressTest { "TestDeleteRange"); } + virtual void TestIngestExternalFile( + ThreadState* /* thread */, + const std::vector& /* rand_column_families */, + const std::vector& /* rand_keys */, + std::unique_ptr& /* lock */) { + assert(false); + fprintf(stderr, + "BatchedOpsStressTest does not support " + "TestIngestExternalFile\n"); + std::terminate(); + } + // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K // in the same snapshot, and verifies that all the values are of the form // "0"+V, "1"+V,..."9"+V. @@ -3132,6 +3238,11 @@ int main(int argc, char** argv) { "Error: nooverwritepercent must not be 100 when using merge operands"); exit(1); } + if (FLAGS_ingest_external_file_one_in > 0 && FLAGS_nooverwritepercent > 0) { + fprintf(stderr, + "Error: nooverwritepercent must be 0 when using file ingestion\n"); + exit(1); + } // Choose a location for the test database if none given with --db= if (FLAGS_db.empty()) {