Extend the stress test coverage of MultiGetEntity (#11336)

Summary:
Similarly to `GetEntity` prior to https://github.com/facebook/rocksdb/issues/11303, the `MultiGetEntity` API is currently
only used in the DB verification logic of the stress tests. The patch introduces
a new mode where all point lookups are performed using `MultiGetEntity`,
and implements the corresponding logic in the non-batched, batched, and
CF consistency tests.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11336

Test Plan: Ran simple blackbox tests for the various stress test flavors.

Reviewed By: akankshamahajan15

Differential Revision: D44513285

Pulled By: ltamasi

fbshipit-source-id: c3db098501bf875b6a356b09fc676a0268d92c35
This commit is contained in:
Levi Tamasi 2023-03-29 20:35:15 -07:00 committed by Facebook GitHub Bot
parent c14eb134ed
commit 0efd7b4ba1
10 changed files with 432 additions and 26 deletions

View File

@ -308,34 +308,10 @@ class BatchedOpsStressTest : public StressTest {
}
}
// Compare columns ignoring the last character of column values
auto compare = [](const WideColumns& lhs, const WideColumns& rhs) {
if (lhs.size() != rhs.size()) {
return false;
}
for (size_t i = 0; i < lhs.size(); ++i) {
if (lhs[i].name() != rhs[i].name()) {
return false;
}
if (lhs[i].value().size() != rhs[i].value().size()) {
return false;
}
if (lhs[i].value().difference_offset(rhs[i].value()) <
lhs[i].value().size() - 1) {
return false;
}
}
return true;
};
for (size_t i = 0; i < num_keys; ++i) {
const WideColumns& columns = results[i].columns();
if (!compare(results[0].columns(), columns)) {
if (!CompareColumns(results[0].columns(), columns)) {
fprintf(stderr,
"GetEntity error: inconsistent entities for key %s: %s, %s\n",
StringToHex(key_suffix).c_str(),
@ -372,6 +348,99 @@ class BatchedOpsStressTest : public StressTest {
}
}
void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
assert(thread);
assert(!rand_column_families.empty());
assert(rand_column_families[0] >= 0);
assert(rand_column_families[0] < static_cast<int>(column_families_.size()));
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
assert(cfh);
assert(!rand_keys.empty());
ManagedSnapshot snapshot_guard(db_);
ReadOptions read_opts_copy(read_opts);
read_opts_copy.snapshot = snapshot_guard.snapshot();
const size_t num_keys = rand_keys.size();
for (size_t i = 0; i < num_keys; ++i) {
const std::string key_suffix = Key(rand_keys[i]);
constexpr size_t num_prefixes = 10;
std::array<std::string, num_prefixes> keys;
std::array<Slice, num_prefixes> key_slices;
std::array<PinnableWideColumns, num_prefixes> results;
std::array<Status, num_prefixes> statuses;
for (size_t j = 0; j < num_prefixes; ++j) {
keys[j] = std::to_string(j) + key_suffix;
key_slices[j] = keys[j];
}
db_->MultiGetEntity(read_opts_copy, cfh, num_prefixes, key_slices.data(),
results.data(), statuses.data());
for (size_t j = 0; j < num_prefixes; ++j) {
const Status& s = statuses[j];
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "MultiGetEntity error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
} else {
thread->stats.AddGets(1, 1);
}
const WideColumns& cmp_columns = results[0].columns();
const WideColumns& columns = results[j].columns();
if (!CompareColumns(cmp_columns, columns)) {
fprintf(stderr,
"MultiGetEntity error: inconsistent entities for key %s: %s, "
"%s\n",
StringToHex(key_suffix).c_str(),
WideColumnsToHex(cmp_columns).c_str(),
WideColumnsToHex(columns).c_str());
}
if (!columns.empty()) {
// The last character of each column value should be 'j' as a decimal
// digit
const char expected = static_cast<char>('0' + j);
for (const auto& column : columns) {
const Slice& value = column.value();
if (value.empty() || value[value.size() - 1] != expected) {
fprintf(stderr,
"MultiGetEntity error: incorrect column value for key "
"%s, entity %s, column value %s, expected %c\n",
StringToHex(key_suffix).c_str(),
WideColumnsToHex(columns).c_str(),
value.ToString(/* hex */ true).c_str(), expected);
}
}
if (!VerifyWideColumns(columns)) {
fprintf(stderr,
"MultiGetEntity error: inconsistent columns for key %s, "
"entity %s\n",
StringToHex(key_suffix).c_str(),
WideColumnsToHex(columns).c_str());
}
}
}
}
}
// Given a key, this does prefix scans for "0"+P, "1"+P, ..., "9"+P
// in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
// of the key. Each of these 10 scans returns a series of values;
@ -493,6 +562,30 @@ class BatchedOpsStressTest : public StressTest {
void VerifyDb(ThreadState* /* thread */) const override {}
void ContinuouslyVerifyDb(ThreadState* /* thread */) const override {}
// Compare columns ignoring the last character of column values
bool CompareColumns(const WideColumns& lhs, const WideColumns& rhs) {
if (lhs.size() != rhs.size()) {
return false;
}
for (size_t i = 0; i < lhs.size(); ++i) {
if (lhs[i].name() != rhs[i].name()) {
return false;
}
if (lhs[i].value().size() != rhs[i].value().size()) {
return false;
}
if (lhs[i].value().difference_offset(rhs[i].value()) <
lhs[i].value().size() - 1) {
return false;
}
}
return true;
}
};
StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); }

View File

@ -391,6 +391,129 @@ class CfConsistencyStressTest : public StressTest {
}
}
void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
assert(thread);
assert(thread->shared);
assert(!rand_column_families.empty());
assert(!rand_keys.empty());
ManagedSnapshot snapshot_guard(db_);
ReadOptions read_opts_copy = read_opts;
read_opts_copy.snapshot = snapshot_guard.snapshot();
const size_t num_cfs = rand_column_families.size();
std::vector<ColumnFamilyHandle*> cfhs;
cfhs.reserve(num_cfs);
for (size_t j = 0; j < num_cfs; ++j) {
assert(rand_column_families[j] >= 0);
assert(rand_column_families[j] <
static_cast<int>(column_families_.size()));
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[j]];
assert(cfh);
cfhs.emplace_back(cfh);
}
const size_t num_keys = rand_keys.size();
for (size_t i = 0; i < num_keys; ++i) {
const std::string key = Key(rand_keys[i]);
std::vector<Slice> key_slices(num_cfs, key);
std::vector<PinnableWideColumns> results(num_cfs);
std::vector<Status> statuses(num_cfs);
db_->MultiGetEntity(read_opts_copy, num_cfs, cfhs.data(),
key_slices.data(), results.data(), statuses.data());
bool is_consistent = true;
for (size_t j = 0; j < num_cfs; ++j) {
const Status& s = statuses[j];
const Status& cmp_s = statuses[0];
const WideColumns& columns = results[j].columns();
const WideColumns& cmp_columns = results[0].columns();
if (!s.ok() && !s.IsNotFound()) {
fprintf(stderr, "TestMultiGetEntity error: %s\n",
s.ToString().c_str());
thread->stats.AddErrors(1);
break;
}
assert(cmp_s.ok() || cmp_s.IsNotFound());
if (s.IsNotFound()) {
if (cmp_s.ok()) {
fprintf(
stderr,
"MultiGetEntity returns different results for key %s: CF %s "
"returns entity %s, CF %s returns not found\n",
StringToHex(key).c_str(), column_family_names_[0].c_str(),
WideColumnsToHex(cmp_columns).c_str(),
column_family_names_[j].c_str());
is_consistent = false;
break;
}
continue;
}
assert(s.ok());
if (cmp_s.IsNotFound()) {
fprintf(stderr,
"MultiGetEntity returns different results for key %s: CF %s "
"returns not found, CF %s returns entity %s\n",
StringToHex(key).c_str(), column_family_names_[0].c_str(),
column_family_names_[j].c_str(),
WideColumnsToHex(columns).c_str());
is_consistent = false;
break;
}
if (columns != cmp_columns) {
fprintf(stderr,
"MultiGetEntity returns different results for key %s: CF %s "
"returns entity %s, CF %s returns entity %s\n",
StringToHex(key).c_str(), column_family_names_[0].c_str(),
WideColumnsToHex(cmp_columns).c_str(),
column_family_names_[j].c_str(),
WideColumnsToHex(columns).c_str());
is_consistent = false;
break;
}
if (!VerifyWideColumns(columns)) {
fprintf(stderr,
"MultiGetEntity error: inconsistent columns for key %s, "
"entity %s\n",
StringToHex(key).c_str(), WideColumnsToHex(columns).c_str());
is_consistent = false;
break;
}
}
if (!is_consistent) {
fprintf(stderr,
"TestMultiGetEntity error: results are not consistent\n");
thread->stats.AddErrors(1);
// Fail fast to preserve the DB state.
thread->shared->SetVerificationFailure();
break;
} else if (statuses[0].ok()) {
thread->stats.AddGets(1, 1);
} else if (statuses[0].IsNotFound()) {
thread->stats.AddGets(1, 0);
}
}
}
Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {

View File

@ -214,6 +214,7 @@ DECLARE_uint64(snapshot_hold_ops);
DECLARE_bool(long_running_snapshots);
DECLARE_bool(use_multiget);
DECLARE_bool(use_get_entity);
DECLARE_bool(use_multi_get_entity);
DECLARE_int32(readpercent);
DECLARE_int32(prefixpercent);
DECLARE_int32(writepercent);

View File

@ -749,6 +749,9 @@ DEFINE_bool(use_multiget, false,
DEFINE_bool(use_get_entity, false, "If set, use the GetEntity API for reads");
DEFINE_bool(use_multi_get_entity, false,
"If set, use the MultiGetEntity API for reads");
static bool ValidateInt32Percent(const char* flagname, int32_t value) {
if (value < 0 || value > 100) {
fprintf(stderr, "Invalid value for --%s: %d, 0<= pct <=100 \n", flagname,

View File

@ -985,7 +985,22 @@ void StressTest::OperateDb(ThreadState* thread) {
if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
assert(0 <= prob_op);
// OPERATION read
if (FLAGS_use_get_entity) {
if (FLAGS_use_multi_get_entity) {
constexpr uint64_t max_batch_size = 64;
const uint64_t batch_size = std::min(
static_cast<uint64_t>(thread->rand.Uniform(max_batch_size)) + 1,
ops_per_open - i);
assert(batch_size >= 1);
assert(batch_size <= max_batch_size);
assert(i + batch_size <= ops_per_open);
rand_keys = GenerateNKeys(thread, static_cast<int>(batch_size), i);
TestMultiGetEntity(thread, read_opts, rand_column_families,
rand_keys);
i += batch_size - 1;
} else if (FLAGS_use_get_entity) {
TestGetEntity(thread, read_opts, rand_column_families, rand_keys);
} else if (FLAGS_use_multiget) {
// Leave room for one more iteration of the loop with a single key
@ -2387,6 +2402,8 @@ void StressTest::PrintEnv() const {
FLAGS_use_multiget ? "true" : "false");
fprintf(stdout, "Use GetEntity : %s\n",
FLAGS_use_get_entity ? "true" : "false");
fprintf(stdout, "Use MultiGetEntity : %s\n",
FLAGS_use_multi_get_entity ? "true" : "false");
const char* memtablerep = "";
switch (FLAGS_rep_factory) {

View File

@ -98,6 +98,11 @@ class StressTest {
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual void TestMultiGetEntity(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,

View File

@ -393,6 +393,12 @@ void MultiOpsTxnsStressTest::TestGetEntity(
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */) {}
// Wide columns are currently not supported by transactions.
void MultiOpsTxnsStressTest::TestMultiGetEntity(
ThreadState* /* thread */, const ReadOptions& /* read_opts */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */) {}
Status MultiOpsTxnsStressTest::TestPrefixScan(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,

View File

@ -214,6 +214,10 @@ class MultiOpsTxnsStressTest : public StressTest {
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override;
void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override;
Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override;

View File

@ -847,6 +847,157 @@ class NonBatchedOpsStressTest : public StressTest {
}
}
void TestMultiGetEntity(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
assert(thread);
ManagedSnapshot snapshot_guard(db_);
ReadOptions read_opts_copy(read_opts);
read_opts_copy.snapshot = snapshot_guard.snapshot();
assert(!rand_column_families.empty());
assert(rand_column_families[0] >= 0);
assert(rand_column_families[0] < static_cast<int>(column_families_.size()));
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
assert(cfh);
assert(!rand_keys.empty());
const size_t num_keys = rand_keys.size();
std::vector<std::string> keys(num_keys);
std::vector<Slice> key_slices(num_keys);
for (size_t i = 0; i < num_keys; ++i) {
keys[i] = Key(rand_keys[i]);
key_slices[i] = keys[i];
}
std::vector<PinnableWideColumns> results(num_keys);
std::vector<Status> statuses(num_keys);
if (fault_fs_guard) {
fault_fs_guard->EnableErrorInjection();
SharedState::ignore_read_error = false;
}
db_->MultiGetEntity(read_opts_copy, cfh, num_keys, key_slices.data(),
results.data(), statuses.data());
int error_count = 0;
if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount();
if (error_count && !SharedState::ignore_read_error) {
int stat_nok = 0;
for (const auto& s : statuses) {
if (!s.ok() && !s.IsNotFound()) {
stat_nok++;
}
}
if (stat_nok < error_count) {
// Grab mutex so multiple threads don't try to print the
// stack trace at the same time
assert(thread->shared);
MutexLock l(thread->shared->GetMutex());
fprintf(stderr, "Didn't get expected error from MultiGetEntity\n");
fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n",
num_keys, error_count, stat_nok);
fprintf(stderr, "Call stack that injected the fault\n");
fault_fs_guard->PrintFaultBacktrace();
std::terminate();
}
}
fault_fs_guard->DisableErrorInjection();
}
const bool check_get_entity = !error_count && thread->rand.OneIn(4);
for (size_t i = 0; i < num_keys; ++i) {
const Status& s = statuses[i];
bool is_consistent = true;
if (s.ok() && !VerifyWideColumns(results[i].columns())) {
fprintf(
stderr,
"error : inconsistent columns returned by MultiGetEntity for key "
"%s: %s\n",
StringToHex(keys[i]).c_str(),
WideColumnsToHex(results[i].columns()).c_str());
is_consistent = false;
} else if (check_get_entity && (s.ok() || s.IsNotFound())) {
PinnableWideColumns cmp_result;
const Status cmp_s =
db_->GetEntity(read_opts_copy, cfh, key_slices[i], &cmp_result);
if (!cmp_s.ok() && !cmp_s.IsNotFound()) {
fprintf(stderr, "GetEntity error: %s\n", cmp_s.ToString().c_str());
is_consistent = false;
} else if (cmp_s.IsNotFound()) {
if (s.ok()) {
fprintf(stderr,
"Inconsistent results for key %s: MultiGetEntity returned "
"ok, GetEntity returned not found\n",
StringToHex(keys[i]).c_str());
is_consistent = false;
}
} else {
assert(cmp_s.ok());
if (s.IsNotFound()) {
fprintf(stderr,
"Inconsistent results for key %s: MultiGetEntity returned "
"not found, GetEntity returned ok\n",
StringToHex(keys[i]).c_str());
is_consistent = false;
} else {
assert(s.ok());
if (results[i] != cmp_result) {
fprintf(
stderr,
"Inconsistent results for key %s: MultiGetEntity returned "
"%s, GetEntity returned %s\n",
StringToHex(keys[i]).c_str(),
WideColumnsToHex(results[i].columns()).c_str(),
WideColumnsToHex(cmp_result.columns()).c_str());
is_consistent = false;
}
}
}
}
if (!is_consistent) {
fprintf(stderr,
"TestMultiGetEntity error: results are not consistent\n");
thread->stats.AddErrors(1);
// Fail fast to preserve the DB state
thread->shared->SetVerificationFailure();
break;
} else if (s.ok()) {
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
} else {
if (error_count == 0) {
fprintf(stderr, "MultiGetEntity error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddVerifiedErrors(1);
}
}
}
}
Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {

View File

@ -137,6 +137,7 @@ default_params = {
"index_block_restart_interval": lambda: random.choice(range(1, 16)),
"use_multiget": lambda: random.randint(0, 1),
"use_get_entity": lambda: random.choice([0] * 7 + [1]),
"use_multi_get_entity": lambda: random.choice([0] * 7 + [1]),
"periodic_compaction_seconds": lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
# 0 = never (used by some), 10 = often (for threading bugs), 600 = default
"stats_dump_period_sec": lambda: random.choice([0, 10, 600]),
@ -462,6 +463,8 @@ multiops_txn_default_params = {
"sync_fault_injection": 0,
# PutEntity in transactions is not yet implemented
"use_put_entity_one_in" : 0,
"use_get_entity" : 0,
"use_multi_get_entity" : 0,
}
multiops_wc_txn_params = {