diff --git a/TARGETS b/TARGETS index 62496a2255..f56cceb40f 100644 --- a/TARGETS +++ b/TARGETS @@ -393,6 +393,7 @@ rocks_cpp_library_wrapper(name="rocksdb_stress_lib", srcs=[ "db_stress_tool/db_stress_stat.cc", "db_stress_tool/db_stress_test_base.cc", "db_stress_tool/db_stress_tool.cc", + "db_stress_tool/db_stress_wide_merge_operator.cc", "db_stress_tool/expected_state.cc", "db_stress_tool/expected_value.cc", "db_stress_tool/multi_ops_txns_stress.cc", diff --git a/db_stress_tool/CMakeLists.txt b/db_stress_tool/CMakeLists.txt index 51d6ea0d6f..60c02e173f 100644 --- a/db_stress_tool/CMakeLists.txt +++ b/db_stress_tool/CMakeLists.txt @@ -9,6 +9,7 @@ add_executable(db_stress${ARTIFACT_SUFFIX} db_stress_shared_state.cc db_stress_stat.cc db_stress_test_base.cc + db_stress_wide_merge_operator.cc db_stress_tool.cc expected_state.cc expected_value.cc diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index 0872f28422..7fb89b60bb 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -52,11 +52,11 @@ class BatchedOpsStressTest : public StressTest { const std::string k = num + key_body; const std::string v = value_body + num; - if (FLAGS_use_merge) { - batch.Merge(cfh, k, v); - } else if (FLAGS_use_put_entity_one_in > 0 && - (value_base % FLAGS_use_put_entity_one_in) == 0) { + if (FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0) { batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v)); + } else if (FLAGS_use_merge) { + batch.Merge(cfh, k, v); } else { batch.Put(cfh, k, v); } diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index f3d9b71d97..a7b0895f37 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -36,18 +36,15 @@ class CfConsistencyStressTest : public StressTest { WriteBatch batch; - const bool use_put_entity = !FLAGS_use_merge && - FLAGS_use_put_entity_one_in > 0 && - (value_base % FLAGS_use_put_entity_one_in) == 0; - for (auto cf : rand_column_families) { ColumnFamilyHandle* const cfh = column_families_[cf]; assert(cfh); - if (FLAGS_use_merge) { - batch.Merge(cfh, k, v); - } else if (use_put_entity) { + if (FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0) { batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v)); + } else if (FLAGS_use_merge) { + batch.Merge(cfh, k, v); } else { batch.Put(cfh, k, v); } diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 3f28b7a8ec..80c1858e76 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -17,6 +17,7 @@ #include "db_stress_tool/db_stress_compaction_filter.h" #include "db_stress_tool/db_stress_driver.h" #include "db_stress_tool/db_stress_table_properties_collector.h" +#include "db_stress_tool/db_stress_wide_merge_operator.h" #include "rocksdb/convenience.h" #include "rocksdb/filter_policy.h" #include "rocksdb/secondary_cache.h" @@ -511,7 +512,11 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, ts = GetNowNanos(); } - if (FLAGS_use_merge) { + if (FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0) { + s = db_->PutEntity(write_opts, cfh, key, + GenerateWideColumns(value_base, v)); + } else if (FLAGS_use_merge) { if (!FLAGS_use_txn) { if (FLAGS_user_timestamp_size > 0) { s = db_->Merge(write_opts, cfh, key, ts, v); @@ -523,9 +528,6 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, write_opts, /*thread=*/nullptr, [&](Transaction& txn) { return txn.Merge(cfh, key, v); }); } - } else if (FLAGS_use_put_entity_one_in > 0) { - s = db_->PutEntity(write_opts, cfh, key, - GenerateWideColumns(value_base, v)); } else { if (!FLAGS_use_txn) { if (FLAGS_user_timestamp_size > 0) { @@ -2755,8 +2757,7 @@ void StressTest::Open(SharedState* shared, bool reopen) { if (s.ok()) { db_ = blob_db; } - } else - { + } else { if (db_preload_finished_.load() && FLAGS_read_only) { s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db, cf_descriptors, &column_families_, &db_); @@ -3337,7 +3338,11 @@ void InitializeOptionsFromFlags( if (FLAGS_use_full_merge_v1) { options.merge_operator = MergeOperators::CreateDeprecatedPutOperator(); } else { - options.merge_operator = MergeOperators::CreatePutOperator(); + if (FLAGS_use_put_entity_one_in > 0) { + options.merge_operator = std::make_shared(); + } else { + options.merge_operator = MergeOperators::CreatePutOperator(); + } } if (FLAGS_enable_compaction_filter) { diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 787efe47de..9c57dafd7a 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -308,11 +308,11 @@ int db_stress_tool(int argc, char** argv) { } if (FLAGS_use_put_entity_one_in > 0 && - (FLAGS_use_merge || FLAGS_use_full_merge_v1 || FLAGS_use_txn || - FLAGS_test_multi_ops_txns || FLAGS_user_timestamp_size > 0)) { + (FLAGS_use_full_merge_v1 || FLAGS_use_txn || FLAGS_test_multi_ops_txns || + FLAGS_user_timestamp_size > 0)) { fprintf(stderr, - "PutEntity is currently incompatible with Merge," - " transactions, and user-defined timestamps\n"); + "Wide columns are incompatible with V1 Merge, transactions, and " + "user-defined timestamps\n"); exit(1); } diff --git a/db_stress_tool/db_stress_wide_merge_operator.cc b/db_stress_tool/db_stress_wide_merge_operator.cc new file mode 100644 index 0000000000..1fcfc30424 --- /dev/null +++ b/db_stress_tool/db_stress_wide_merge_operator.cc @@ -0,0 +1,51 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifdef GFLAGS + +#include "db_stress_tool/db_stress_wide_merge_operator.h" + +#include "db_stress_tool/db_stress_common.h" + +namespace ROCKSDB_NAMESPACE { + +bool DBStressWideMergeOperator::FullMergeV3( + const MergeOperationInputV3& merge_in, + MergeOperationOutputV3* merge_out) const { + assert(!merge_in.operand_list.empty()); + assert(merge_out); + + const Slice& latest = merge_in.operand_list.back(); + + if (latest.size() < sizeof(uint32_t)) { + return false; + } + + const uint32_t value_base = GetValueBase(latest); + + if (FLAGS_use_put_entity_one_in == 0 || + (value_base % FLAGS_use_put_entity_one_in) != 0) { + merge_out->new_value = latest; + return true; + } + + const auto columns = GenerateWideColumns(value_base, latest); + + merge_out->new_value = MergeOperationOutputV3::NewColumns(); + auto& new_columns = + std::get(merge_out->new_value); + new_columns.reserve(columns.size()); + + for (const auto& column : columns) { + new_columns.emplace_back(column.name().ToString(), + column.value().ToString()); + } + + return true; +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // GFLAGS diff --git a/db_stress_tool/db_stress_wide_merge_operator.h b/db_stress_tool/db_stress_wide_merge_operator.h new file mode 100644 index 0000000000..cba4f6b6b8 --- /dev/null +++ b/db_stress_tool/db_stress_wide_merge_operator.h @@ -0,0 +1,27 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include "rocksdb/merge_operator.h" + +namespace ROCKSDB_NAMESPACE { + +// A test merge operator that implements the wide-column aware FullMergeV3 +// interface. Similarly to the simple "put" type merge operators, the merge +// result is based on the last merge operand; however, the merge result can +// potentially be a wide-column entity, depending on the value base encoded into +// the merge operand and the value of the "use_put_entity_one_in" stress test +// option. Following the same rule as for writes ensures that the queries +// issued by the validation logic receive the expected results. +class DBStressWideMergeOperator : public MergeOperator { + public: + bool FullMergeV3(const MergeOperationInputV3& merge_in, + MergeOperationOutputV3* merge_out) const override; + + const char* Name() const override { return "DBStressWideMergeOperator"; } +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 80e5942e8d..eeb44560d9 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -1279,7 +1279,11 @@ class NonBatchedOpsStressTest : public StressTest { Status s; - if (FLAGS_use_merge) { + if (FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0) { + s = db_->PutEntity(write_opts, cfh, k, + GenerateWideColumns(value_base, v)); + } else if (FLAGS_use_merge) { if (!FLAGS_use_txn) { if (FLAGS_user_timestamp_size == 0) { s = db_->Merge(write_opts, cfh, k, v); @@ -1291,10 +1295,6 @@ class NonBatchedOpsStressTest : public StressTest { return txn.Merge(cfh, k, v); }); } - } else if (FLAGS_use_put_entity_one_in > 0 && - (value_base % FLAGS_use_put_entity_one_in) == 0) { - s = db_->PutEntity(write_opts, cfh, k, - GenerateWideColumns(value_base, v)); } else { if (!FLAGS_use_txn) { if (FLAGS_user_timestamp_size == 0) { @@ -1542,11 +1542,8 @@ class NonBatchedOpsStressTest : public StressTest { const Slice k(key_str); const Slice v(value, value_len); - const bool use_put_entity = - !FLAGS_use_merge && FLAGS_use_put_entity_one_in > 0 && - (value_base % FLAGS_use_put_entity_one_in) == 0; - - if (use_put_entity) { + if (FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0) { WideColumns columns = GenerateWideColumns(value_base, v); s = sst_file_writer.PutEntity(k, columns); } else { diff --git a/src.mk b/src.mk index edc8bdef5e..f6927256aa 100644 --- a/src.mk +++ b/src.mk @@ -380,6 +380,7 @@ STRESS_LIB_SOURCES = \ db_stress_tool/db_stress_stat.cc \ db_stress_tool/db_stress_test_base.cc \ db_stress_tool/db_stress_tool.cc \ + db_stress_tool/db_stress_wide_merge_operator.cc \ db_stress_tool/expected_state.cc \ db_stress_tool/expected_value.cc \ db_stress_tool/no_batched_ops_stress.cc \ diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index dbfe1c68b3..8b5d29d086 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -673,9 +673,8 @@ def finalize_and_sanitize(src_params): if dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0: dest_params["sync_fault_injection"] = 0 dest_params["manual_wal_flush_one_in"] = 0 - # PutEntity is currently incompatible with Merge + # Wide column stress tests require FullMergeV3 if dest_params["use_put_entity_one_in"] != 0: - dest_params["use_merge"] = 0 dest_params["use_full_merge_v1"] = 0 if dest_params["file_checksum_impl"] == "none": dest_params["verify_file_checksums_one_in"] = 0