mirror of https://github.com/facebook/rocksdb.git
Add the wide-column aware merge API to the stress tests (#11906)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/11906 The patch adds stress test coverage for the wide-column aware `FullMergeV3` API by implementing a new `DBStressWideMergeOperator`. This operator is similar to `PutOperator` / `PutOperatorV2` in the sense that its result is based on the last merge operand; however, the merge result can be either a plain value or a wide-column entity, depending on the value base encoded into the operand and the value of the `use_put_entity_one_in` stress test parameter. Following the same rule for merge results that we do for writes ensures that the queries issued by the validation logic receive the expected results. The new operator is used instead of `PutOperatorV2` whenever `use_put_entity_one_in` is positive. Note that the patch also makes it possible to set `use_put_entity_one_in` and `use_merge` (but not `use_full_merge_v1`) at the same time, giving `use_put_entity_one_in` precedence, so the stress test will use `PutEntity` for writes passing the `use_put_entity_one_in` check described above and `Merge` for any other writes. Reviewed By: jaykorean Differential Revision: D49760024 fbshipit-source-id: 3893602c3e7935381b484f4f5026f1983e3a04a9
This commit is contained in:
parent
8b566964b8
commit
01e2d33565
1
TARGETS
1
TARGETS
|
@ -393,6 +393,7 @@ rocks_cpp_library_wrapper(name="rocksdb_stress_lib", srcs=[
|
|||
"db_stress_tool/db_stress_stat.cc",
|
||||
"db_stress_tool/db_stress_test_base.cc",
|
||||
"db_stress_tool/db_stress_tool.cc",
|
||||
"db_stress_tool/db_stress_wide_merge_operator.cc",
|
||||
"db_stress_tool/expected_state.cc",
|
||||
"db_stress_tool/expected_value.cc",
|
||||
"db_stress_tool/multi_ops_txns_stress.cc",
|
||||
|
|
|
@ -9,6 +9,7 @@ add_executable(db_stress${ARTIFACT_SUFFIX}
|
|||
db_stress_shared_state.cc
|
||||
db_stress_stat.cc
|
||||
db_stress_test_base.cc
|
||||
db_stress_wide_merge_operator.cc
|
||||
db_stress_tool.cc
|
||||
expected_state.cc
|
||||
expected_value.cc
|
||||
|
|
|
@ -52,11 +52,11 @@ class BatchedOpsStressTest : public StressTest {
|
|||
const std::string k = num + key_body;
|
||||
const std::string v = value_body + num;
|
||||
|
||||
if (FLAGS_use_merge) {
|
||||
batch.Merge(cfh, k, v);
|
||||
} else if (FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0) {
|
||||
if (FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0) {
|
||||
batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
|
||||
} else if (FLAGS_use_merge) {
|
||||
batch.Merge(cfh, k, v);
|
||||
} else {
|
||||
batch.Put(cfh, k, v);
|
||||
}
|
||||
|
|
|
@ -36,18 +36,15 @@ class CfConsistencyStressTest : public StressTest {
|
|||
|
||||
WriteBatch batch;
|
||||
|
||||
const bool use_put_entity = !FLAGS_use_merge &&
|
||||
FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0;
|
||||
|
||||
for (auto cf : rand_column_families) {
|
||||
ColumnFamilyHandle* const cfh = column_families_[cf];
|
||||
assert(cfh);
|
||||
|
||||
if (FLAGS_use_merge) {
|
||||
batch.Merge(cfh, k, v);
|
||||
} else if (use_put_entity) {
|
||||
if (FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0) {
|
||||
batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
|
||||
} else if (FLAGS_use_merge) {
|
||||
batch.Merge(cfh, k, v);
|
||||
} else {
|
||||
batch.Put(cfh, k, v);
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "db_stress_tool/db_stress_compaction_filter.h"
|
||||
#include "db_stress_tool/db_stress_driver.h"
|
||||
#include "db_stress_tool/db_stress_table_properties_collector.h"
|
||||
#include "db_stress_tool/db_stress_wide_merge_operator.h"
|
||||
#include "rocksdb/convenience.h"
|
||||
#include "rocksdb/filter_policy.h"
|
||||
#include "rocksdb/secondary_cache.h"
|
||||
|
@ -511,7 +512,11 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
|
|||
ts = GetNowNanos();
|
||||
}
|
||||
|
||||
if (FLAGS_use_merge) {
|
||||
if (FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0) {
|
||||
s = db_->PutEntity(write_opts, cfh, key,
|
||||
GenerateWideColumns(value_base, v));
|
||||
} else if (FLAGS_use_merge) {
|
||||
if (!FLAGS_use_txn) {
|
||||
if (FLAGS_user_timestamp_size > 0) {
|
||||
s = db_->Merge(write_opts, cfh, key, ts, v);
|
||||
|
@ -523,9 +528,6 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
|
|||
write_opts, /*thread=*/nullptr,
|
||||
[&](Transaction& txn) { return txn.Merge(cfh, key, v); });
|
||||
}
|
||||
} else if (FLAGS_use_put_entity_one_in > 0) {
|
||||
s = db_->PutEntity(write_opts, cfh, key,
|
||||
GenerateWideColumns(value_base, v));
|
||||
} else {
|
||||
if (!FLAGS_use_txn) {
|
||||
if (FLAGS_user_timestamp_size > 0) {
|
||||
|
@ -2755,8 +2757,7 @@ void StressTest::Open(SharedState* shared, bool reopen) {
|
|||
if (s.ok()) {
|
||||
db_ = blob_db;
|
||||
}
|
||||
} else
|
||||
{
|
||||
} else {
|
||||
if (db_preload_finished_.load() && FLAGS_read_only) {
|
||||
s = DB::OpenForReadOnly(DBOptions(options_), FLAGS_db,
|
||||
cf_descriptors, &column_families_, &db_);
|
||||
|
@ -3337,7 +3338,11 @@ void InitializeOptionsFromFlags(
|
|||
if (FLAGS_use_full_merge_v1) {
|
||||
options.merge_operator = MergeOperators::CreateDeprecatedPutOperator();
|
||||
} else {
|
||||
options.merge_operator = MergeOperators::CreatePutOperator();
|
||||
if (FLAGS_use_put_entity_one_in > 0) {
|
||||
options.merge_operator = std::make_shared<DBStressWideMergeOperator>();
|
||||
} else {
|
||||
options.merge_operator = MergeOperators::CreatePutOperator();
|
||||
}
|
||||
}
|
||||
|
||||
if (FLAGS_enable_compaction_filter) {
|
||||
|
|
|
@ -308,11 +308,11 @@ int db_stress_tool(int argc, char** argv) {
|
|||
}
|
||||
|
||||
if (FLAGS_use_put_entity_one_in > 0 &&
|
||||
(FLAGS_use_merge || FLAGS_use_full_merge_v1 || FLAGS_use_txn ||
|
||||
FLAGS_test_multi_ops_txns || FLAGS_user_timestamp_size > 0)) {
|
||||
(FLAGS_use_full_merge_v1 || FLAGS_use_txn || FLAGS_test_multi_ops_txns ||
|
||||
FLAGS_user_timestamp_size > 0)) {
|
||||
fprintf(stderr,
|
||||
"PutEntity is currently incompatible with Merge,"
|
||||
" transactions, and user-defined timestamps\n");
|
||||
"Wide columns are incompatible with V1 Merge, transactions, and "
|
||||
"user-defined timestamps\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#ifdef GFLAGS
|
||||
|
||||
#include "db_stress_tool/db_stress_wide_merge_operator.h"
|
||||
|
||||
#include "db_stress_tool/db_stress_common.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
bool DBStressWideMergeOperator::FullMergeV3(
|
||||
const MergeOperationInputV3& merge_in,
|
||||
MergeOperationOutputV3* merge_out) const {
|
||||
assert(!merge_in.operand_list.empty());
|
||||
assert(merge_out);
|
||||
|
||||
const Slice& latest = merge_in.operand_list.back();
|
||||
|
||||
if (latest.size() < sizeof(uint32_t)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const uint32_t value_base = GetValueBase(latest);
|
||||
|
||||
if (FLAGS_use_put_entity_one_in == 0 ||
|
||||
(value_base % FLAGS_use_put_entity_one_in) != 0) {
|
||||
merge_out->new_value = latest;
|
||||
return true;
|
||||
}
|
||||
|
||||
const auto columns = GenerateWideColumns(value_base, latest);
|
||||
|
||||
merge_out->new_value = MergeOperationOutputV3::NewColumns();
|
||||
auto& new_columns =
|
||||
std::get<MergeOperationOutputV3::NewColumns>(merge_out->new_value);
|
||||
new_columns.reserve(columns.size());
|
||||
|
||||
for (const auto& column : columns) {
|
||||
new_columns.emplace_back(column.name().ToString(),
|
||||
column.value().ToString());
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
#endif // GFLAGS
|
|
@ -0,0 +1,27 @@
|
|||
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "rocksdb/merge_operator.h"
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
|
||||
// A test merge operator that implements the wide-column aware FullMergeV3
|
||||
// interface. Similarly to the simple "put" type merge operators, the merge
|
||||
// result is based on the last merge operand; however, the merge result can
|
||||
// potentially be a wide-column entity, depending on the value base encoded into
|
||||
// the merge operand and the value of the "use_put_entity_one_in" stress test
|
||||
// option. Following the same rule as for writes ensures that the queries
|
||||
// issued by the validation logic receive the expected results.
|
||||
class DBStressWideMergeOperator : public MergeOperator {
|
||||
public:
|
||||
bool FullMergeV3(const MergeOperationInputV3& merge_in,
|
||||
MergeOperationOutputV3* merge_out) const override;
|
||||
|
||||
const char* Name() const override { return "DBStressWideMergeOperator"; }
|
||||
};
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
|
@ -1279,7 +1279,11 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
|
||||
Status s;
|
||||
|
||||
if (FLAGS_use_merge) {
|
||||
if (FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0) {
|
||||
s = db_->PutEntity(write_opts, cfh, k,
|
||||
GenerateWideColumns(value_base, v));
|
||||
} else if (FLAGS_use_merge) {
|
||||
if (!FLAGS_use_txn) {
|
||||
if (FLAGS_user_timestamp_size == 0) {
|
||||
s = db_->Merge(write_opts, cfh, k, v);
|
||||
|
@ -1291,10 +1295,6 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
return txn.Merge(cfh, k, v);
|
||||
});
|
||||
}
|
||||
} else if (FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0) {
|
||||
s = db_->PutEntity(write_opts, cfh, k,
|
||||
GenerateWideColumns(value_base, v));
|
||||
} else {
|
||||
if (!FLAGS_use_txn) {
|
||||
if (FLAGS_user_timestamp_size == 0) {
|
||||
|
@ -1542,11 +1542,8 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
const Slice k(key_str);
|
||||
const Slice v(value, value_len);
|
||||
|
||||
const bool use_put_entity =
|
||||
!FLAGS_use_merge && FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0;
|
||||
|
||||
if (use_put_entity) {
|
||||
if (FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0) {
|
||||
WideColumns columns = GenerateWideColumns(value_base, v);
|
||||
s = sst_file_writer.PutEntity(k, columns);
|
||||
} else {
|
||||
|
|
1
src.mk
1
src.mk
|
@ -380,6 +380,7 @@ STRESS_LIB_SOURCES = \
|
|||
db_stress_tool/db_stress_stat.cc \
|
||||
db_stress_tool/db_stress_test_base.cc \
|
||||
db_stress_tool/db_stress_tool.cc \
|
||||
db_stress_tool/db_stress_wide_merge_operator.cc \
|
||||
db_stress_tool/expected_state.cc \
|
||||
db_stress_tool/expected_value.cc \
|
||||
db_stress_tool/no_batched_ops_stress.cc \
|
||||
|
|
|
@ -673,9 +673,8 @@ def finalize_and_sanitize(src_params):
|
|||
if dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0:
|
||||
dest_params["sync_fault_injection"] = 0
|
||||
dest_params["manual_wal_flush_one_in"] = 0
|
||||
# PutEntity is currently incompatible with Merge
|
||||
# Wide column stress tests require FullMergeV3
|
||||
if dest_params["use_put_entity_one_in"] != 0:
|
||||
dest_params["use_merge"] = 0
|
||||
dest_params["use_full_merge_v1"] = 0
|
||||
if dest_params["file_checksum_impl"] == "none":
|
||||
dest_params["verify_file_checksums_one_in"] = 0
|
||||
|
|
Loading…
Reference in New Issue