mirror of https://github.com/facebook/rocksdb.git
Refactor the non-attribute-group/attribute-group code paths in NonBatchedOpsStressTest::TestMultiGetEntity (#12715)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/12715 The patch refactors/deduplicates the non-attribute-group and attribute-group code paths in `NonBatchedOpsStressTest::TestMultiGetEntity` by introducing two new generic lambdas `verify_expected_errors` and `check_results` (the latter of which subsumes the existing `handle_results`) that can handle both types of APIs. This change also serves as groundwork for the upcoming transactional `MultiGetEntity` stress tests. Reviewed By: jaykorean Differential Revision: D57977700 fbshipit-source-id: 83a18a9e57f46ea92ba07b2f0dca3e9bc353f257
This commit is contained in:
parent
0ae3d9f98d
commit
01179678b2
|
@ -1069,196 +1069,52 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
|
||||
int error_count = 0;
|
||||
|
||||
auto handle_result = [](ThreadState* _thread, const Status& s,
|
||||
bool is_consistent, int err_count) {
|
||||
if (!is_consistent) {
|
||||
fprintf(stderr,
|
||||
"TestMultiGetEntity%s error: results are not consistent\n",
|
||||
FLAGS_use_attribute_group ? "(AttributeGroup)" : "");
|
||||
_thread->stats.AddErrors(1);
|
||||
// Fail fast to preserve the DB state
|
||||
_thread->shared->SetVerificationFailure();
|
||||
} else if (s.ok()) {
|
||||
_thread->stats.AddGets(1, 1);
|
||||
} else if (s.IsNotFound()) {
|
||||
_thread->stats.AddGets(1, 0);
|
||||
} else {
|
||||
if (err_count == 0) {
|
||||
fprintf(stderr, "MultiGetEntity%s error: %s\n",
|
||||
FLAGS_use_attribute_group ? "(AttributeGroup)" : "",
|
||||
s.ToString().c_str());
|
||||
_thread->stats.AddErrors(1);
|
||||
} else {
|
||||
_thread->stats.AddVerifiedErrors(1);
|
||||
auto verify_expected_errors = [&](auto get_status) {
|
||||
assert(fault_fs_guard);
|
||||
|
||||
error_count = fault_fs_guard->GetAndResetErrorCount();
|
||||
|
||||
if (error_count && !SharedState::ignore_read_error) {
|
||||
int stat_nok = 0;
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
const Status& s = get_status(i);
|
||||
if (!s.ok() && !s.IsNotFound()) {
|
||||
++stat_nok;
|
||||
}
|
||||
}
|
||||
|
||||
if (stat_nok < error_count) {
|
||||
// Grab mutex so multiple threads don't try to print the
|
||||
// stack trace at the same time
|
||||
assert(thread->shared);
|
||||
MutexLock l(thread->shared->GetMutex());
|
||||
|
||||
fprintf(stderr, "Didn't get expected error from MultiGetEntity\n");
|
||||
fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n",
|
||||
num_keys, error_count, stat_nok);
|
||||
fprintf(stderr, "Call stack that injected the fault\n");
|
||||
fault_fs_guard->PrintFaultBacktrace();
|
||||
std::terminate();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (FLAGS_use_attribute_group) {
|
||||
// AttributeGroup MultiGetEntity verification
|
||||
|
||||
std::vector<PinnableAttributeGroups> results;
|
||||
results.reserve(num_keys);
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
PinnableAttributeGroups attribute_groups;
|
||||
attribute_groups.emplace_back(cfh);
|
||||
results.emplace_back(std::move(attribute_groups));
|
||||
}
|
||||
db_->MultiGetEntity(read_opts_copy, num_keys, key_slices.data(),
|
||||
results.data());
|
||||
|
||||
if (fault_fs_guard) {
|
||||
error_count = fault_fs_guard->GetAndResetErrorCount();
|
||||
|
||||
if (error_count && !SharedState::ignore_read_error) {
|
||||
int stat_nok = 0;
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
const Status& s = results[i][0].status();
|
||||
if (!s.ok() && !s.IsNotFound()) {
|
||||
stat_nok++;
|
||||
}
|
||||
}
|
||||
|
||||
if (stat_nok < error_count) {
|
||||
// Grab mutex so multiple threads don't try to print the
|
||||
// stack trace at the same time
|
||||
assert(thread->shared);
|
||||
MutexLock l(thread->shared->GetMutex());
|
||||
|
||||
fprintf(stderr,
|
||||
"Didn't get expected error from MultiGetEntity "
|
||||
"(AttributeGroup)\n");
|
||||
fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n",
|
||||
num_keys, error_count, stat_nok);
|
||||
fprintf(stderr, "Call stack that injected the fault\n");
|
||||
fault_fs_guard->PrintFaultBacktrace();
|
||||
std::terminate();
|
||||
}
|
||||
}
|
||||
fault_fs_guard->DisableErrorInjection();
|
||||
}
|
||||
|
||||
// Compare against non-attribute-group GetEntity result
|
||||
auto check_results = [&](auto get_columns, auto get_status) {
|
||||
const bool check_get_entity =
|
||||
!error_count && FLAGS_check_multiget_entity_consistency;
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
assert(results[i].size() == 1);
|
||||
const Status& s = results[i][0].status();
|
||||
const WideColumns& columns = get_columns(i);
|
||||
const Status& s = get_status(i);
|
||||
|
||||
bool is_consistent = true;
|
||||
|
||||
if (s.ok() && !VerifyWideColumns(results[i][0].columns())) {
|
||||
fprintf(stderr,
|
||||
"error : inconsistent columns returned by MultiGetEntity "
|
||||
"(AttributeGroup) for key "
|
||||
"%s: %s\n",
|
||||
StringToHex(keys[i]).c_str(),
|
||||
WideColumnsToHex(results[i][0].columns()).c_str());
|
||||
is_consistent = false;
|
||||
} else if (check_get_entity && (s.ok() || s.IsNotFound())) {
|
||||
PinnableWideColumns cmp_result;
|
||||
ThreadStatusUtil::SetThreadOperation(
|
||||
ThreadStatus::OperationType::OP_GETENTITY);
|
||||
const Status cmp_s =
|
||||
db_->GetEntity(read_opts_copy, cfh, key_slices[i], &cmp_result);
|
||||
|
||||
if (!cmp_s.ok() && !cmp_s.IsNotFound()) {
|
||||
fprintf(stderr, "GetEntity error: %s\n", cmp_s.ToString().c_str());
|
||||
is_consistent = false;
|
||||
} else if (cmp_s.IsNotFound()) {
|
||||
if (s.ok()) {
|
||||
fprintf(stderr,
|
||||
"Inconsistent results for key %s: MultiGetEntity "
|
||||
"(AttributeGroup) returned "
|
||||
"ok, GetEntity returned not found\n",
|
||||
StringToHex(keys[i]).c_str());
|
||||
is_consistent = false;
|
||||
}
|
||||
} else {
|
||||
assert(cmp_s.ok());
|
||||
|
||||
if (s.IsNotFound()) {
|
||||
fprintf(stderr,
|
||||
"Inconsistent results for key %s: MultiGetEntity "
|
||||
"(AttributeGroup) returned "
|
||||
"not found, GetEntity returned ok\n",
|
||||
StringToHex(keys[i]).c_str());
|
||||
is_consistent = false;
|
||||
} else {
|
||||
assert(s.ok());
|
||||
|
||||
if (results[i][0].columns() != cmp_result.columns()) {
|
||||
fprintf(stderr,
|
||||
"Inconsistent results for key %s: MultiGetEntity "
|
||||
"(AttributeGroup) returned "
|
||||
"%s, GetEntity returned %s\n",
|
||||
StringToHex(keys[i]).c_str(),
|
||||
WideColumnsToHex(results[i][0].columns()).c_str(),
|
||||
WideColumnsToHex(cmp_result.columns()).c_str());
|
||||
is_consistent = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
handle_result(thread, s, is_consistent, error_count);
|
||||
if (!is_consistent) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Non-AttributeGroup MultiGetEntity verification
|
||||
|
||||
std::vector<PinnableWideColumns> results(num_keys);
|
||||
std::vector<Status> statuses(num_keys);
|
||||
|
||||
db_->MultiGetEntity(read_opts_copy, cfh, num_keys, key_slices.data(),
|
||||
results.data(), statuses.data());
|
||||
|
||||
if (fault_fs_guard) {
|
||||
error_count = fault_fs_guard->GetAndResetErrorCount();
|
||||
|
||||
if (error_count && !SharedState::ignore_read_error) {
|
||||
int stat_nok = 0;
|
||||
for (const auto& s : statuses) {
|
||||
if (!s.ok() && !s.IsNotFound()) {
|
||||
stat_nok++;
|
||||
}
|
||||
}
|
||||
|
||||
if (stat_nok < error_count) {
|
||||
// Grab mutex so multiple threads don't try to print the
|
||||
// stack trace at the same time
|
||||
assert(thread->shared);
|
||||
MutexLock l(thread->shared->GetMutex());
|
||||
|
||||
fprintf(stderr, "Didn't get expected error from MultiGetEntity\n");
|
||||
fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n",
|
||||
num_keys, error_count, stat_nok);
|
||||
fprintf(stderr, "Call stack that injected the fault\n");
|
||||
fault_fs_guard->PrintFaultBacktrace();
|
||||
std::terminate();
|
||||
}
|
||||
}
|
||||
|
||||
fault_fs_guard->DisableErrorInjection();
|
||||
}
|
||||
|
||||
const bool check_get_entity =
|
||||
!error_count && FLAGS_check_multiget_entity_consistency;
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
const Status& s = statuses[i];
|
||||
|
||||
bool is_consistent = true;
|
||||
|
||||
if (s.ok() && !VerifyWideColumns(results[i].columns())) {
|
||||
if (s.ok() && !VerifyWideColumns(columns)) {
|
||||
fprintf(
|
||||
stderr,
|
||||
"error : inconsistent columns returned by MultiGetEntity for key "
|
||||
"%s: %s\n",
|
||||
StringToHex(keys[i]).c_str(),
|
||||
WideColumnsToHex(results[i].columns()).c_str());
|
||||
StringToHex(keys[i]).c_str(), WideColumnsToHex(columns).c_str());
|
||||
is_consistent = false;
|
||||
} else if (check_get_entity && (s.ok() || s.IsNotFound())) {
|
||||
PinnableWideColumns cmp_result;
|
||||
|
@ -1292,24 +1148,85 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
} else {
|
||||
assert(s.ok());
|
||||
|
||||
if (results[i] != cmp_result) {
|
||||
const WideColumns& cmp_columns = cmp_result.columns();
|
||||
|
||||
if (columns != cmp_columns) {
|
||||
fprintf(
|
||||
stderr,
|
||||
"Inconsistent results for key %s: MultiGetEntity returned "
|
||||
"%s, GetEntity returned %s\n",
|
||||
StringToHex(keys[i]).c_str(),
|
||||
WideColumnsToHex(results[i].columns()).c_str(),
|
||||
WideColumnsToHex(cmp_result.columns()).c_str());
|
||||
WideColumnsToHex(columns).c_str(),
|
||||
WideColumnsToHex(cmp_columns).c_str());
|
||||
is_consistent = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
handle_result(thread, s, is_consistent, error_count);
|
||||
|
||||
if (!is_consistent) {
|
||||
fprintf(stderr,
|
||||
"TestMultiGetEntity error: results are not consistent\n");
|
||||
thread->stats.AddErrors(1);
|
||||
// Fail fast to preserve the DB state
|
||||
thread->shared->SetVerificationFailure();
|
||||
break;
|
||||
} else if (s.ok()) {
|
||||
thread->stats.AddGets(1, 1);
|
||||
} else if (s.IsNotFound()) {
|
||||
thread->stats.AddGets(1, 0);
|
||||
} else {
|
||||
if (error_count == 0) {
|
||||
fprintf(stderr, "MultiGetEntity error: %s\n", s.ToString().c_str());
|
||||
thread->stats.AddErrors(1);
|
||||
} else {
|
||||
thread->stats.AddVerifiedErrors(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (FLAGS_use_attribute_group) {
|
||||
// AttributeGroup MultiGetEntity verification
|
||||
|
||||
std::vector<PinnableAttributeGroups> results;
|
||||
results.reserve(num_keys);
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
PinnableAttributeGroups attribute_groups;
|
||||
attribute_groups.emplace_back(cfh);
|
||||
results.emplace_back(std::move(attribute_groups));
|
||||
}
|
||||
|
||||
db_->MultiGetEntity(read_opts_copy, num_keys, key_slices.data(),
|
||||
results.data());
|
||||
|
||||
if (fault_fs_guard) {
|
||||
verify_expected_errors(
|
||||
[&](size_t i) { return results[i][0].status(); });
|
||||
|
||||
fault_fs_guard->DisableErrorInjection();
|
||||
}
|
||||
|
||||
// Compare against non-attribute-group GetEntity result
|
||||
check_results([&](size_t i) { return results[i][0].columns(); },
|
||||
[&](size_t i) { return results[i][0].status(); });
|
||||
} else {
|
||||
// Non-AttributeGroup MultiGetEntity verification
|
||||
|
||||
std::vector<PinnableWideColumns> results(num_keys);
|
||||
std::vector<Status> statuses(num_keys);
|
||||
|
||||
db_->MultiGetEntity(read_opts_copy, cfh, num_keys, key_slices.data(),
|
||||
results.data(), statuses.data());
|
||||
|
||||
if (fault_fs_guard) {
|
||||
verify_expected_errors([&](size_t i) { return statuses[i]; });
|
||||
|
||||
fault_fs_guard->DisableErrorInjection();
|
||||
}
|
||||
|
||||
check_results([&](size_t i) { return results[i].columns(); },
|
||||
[&](size_t i) { return statuses[i]; });
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue