mirror of https://github.com/facebook/rocksdb.git
Add transactional/read-your-own-write MultiGetEntity stress test (#12717)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/12717 The PR adds `Transaction::MultiGetEntity` to the stress tests. Similarly to what we do for `Transaction::MultiGet`, in this mode we open a transaction and randomly add writes for some of the queried keys to it while keeping track of the values written on a per-key basis. The results of `Transaction::MultiGetEntity` can then be validated against these expected values (in order to test the read-your-own-writes functionality) as well as the results returned by `Transaction::GetEntity` for the same keys. Reviewed By: jaykorean Differential Revision: D57990210 fbshipit-source-id: 9bf3bb292051c2c57757f86b517919197b03c524
This commit is contained in:
parent
a901ef48f0
commit
6f17056e40
|
@ -1044,27 +1044,48 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
read_opts_copy.snapshot = snapshot_guard.snapshot();
|
||||
|
||||
assert(!rand_column_families.empty());
|
||||
assert(rand_column_families[0] >= 0);
|
||||
assert(rand_column_families[0] < static_cast<int>(column_families_.size()));
|
||||
|
||||
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
|
||||
const int column_family = rand_column_families[0];
|
||||
|
||||
assert(column_family >= 0);
|
||||
assert(column_family < static_cast<int>(column_families_.size()));
|
||||
|
||||
ColumnFamilyHandle* const cfh = column_families_[column_family];
|
||||
assert(cfh);
|
||||
|
||||
assert(!rand_keys.empty());
|
||||
|
||||
const size_t num_keys = rand_keys.size();
|
||||
|
||||
std::vector<std::string> keys(num_keys);
|
||||
std::vector<Slice> key_slices(num_keys);
|
||||
std::unique_ptr<Transaction> txn;
|
||||
|
||||
if (fault_fs_guard) {
|
||||
fault_fs_guard->EnableErrorInjection();
|
||||
SharedState::ignore_read_error = false;
|
||||
if (FLAGS_use_txn) {
|
||||
WriteOptions write_options;
|
||||
if (FLAGS_rate_limit_auto_wal_flush) {
|
||||
write_options.rate_limiter_priority = Env::IO_USER;
|
||||
}
|
||||
|
||||
const Status s = NewTxn(write_options, &txn);
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "NewTxn error: %s\n", s.ToString().c_str());
|
||||
thread->shared->SafeTerminate();
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::string> keys(num_keys);
|
||||
std::vector<Slice> key_slices(num_keys);
|
||||
std::unordered_map<std::string, ExpectedValue> ryw_expected_values;
|
||||
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
keys[i] = Key(rand_keys[i]);
|
||||
const int64_t key = rand_keys[i];
|
||||
|
||||
keys[i] = Key(key);
|
||||
key_slices[i] = keys[i];
|
||||
|
||||
if (FLAGS_use_txn) {
|
||||
MaybeAddKeyToTxnForRYW(thread, column_family, key, txn.get(),
|
||||
ryw_expected_values);
|
||||
}
|
||||
}
|
||||
|
||||
int error_count = 0;
|
||||
|
@ -1099,7 +1120,8 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
}
|
||||
};
|
||||
|
||||
auto check_results = [&](auto get_columns, auto get_status) {
|
||||
auto check_results = [&](auto get_columns, auto get_status,
|
||||
auto do_extra_check, auto call_get_entity) {
|
||||
const bool check_get_entity =
|
||||
!error_count && FLAGS_check_multiget_entity_consistency;
|
||||
|
||||
|
@ -1116,50 +1138,54 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
"%s: %s\n",
|
||||
StringToHex(keys[i]).c_str(), WideColumnsToHex(columns).c_str());
|
||||
is_consistent = false;
|
||||
} else if (check_get_entity && (s.ok() || s.IsNotFound())) {
|
||||
PinnableWideColumns cmp_result;
|
||||
ThreadStatusUtil::SetThreadOperation(
|
||||
ThreadStatus::OperationType::OP_GETENTITY);
|
||||
const Status cmp_s =
|
||||
db_->GetEntity(read_opts_copy, cfh, key_slices[i], &cmp_result);
|
||||
|
||||
if (!cmp_s.ok() && !cmp_s.IsNotFound()) {
|
||||
fprintf(stderr, "GetEntity error: %s\n", cmp_s.ToString().c_str());
|
||||
} else if (s.ok() || s.IsNotFound()) {
|
||||
if (!do_extra_check(keys[i], columns, s)) {
|
||||
is_consistent = false;
|
||||
} else if (cmp_s.IsNotFound()) {
|
||||
if (s.ok()) {
|
||||
fprintf(
|
||||
stderr,
|
||||
"Inconsistent results for key %s: MultiGetEntity returned "
|
||||
"ok, GetEntity returned not found\n",
|
||||
StringToHex(keys[i]).c_str());
|
||||
} else if (check_get_entity) {
|
||||
PinnableWideColumns cmp_result;
|
||||
ThreadStatusUtil::SetThreadOperation(
|
||||
ThreadStatus::OperationType::OP_GETENTITY);
|
||||
const Status cmp_s = call_get_entity(key_slices[i], &cmp_result);
|
||||
|
||||
if (!cmp_s.ok() && !cmp_s.IsNotFound()) {
|
||||
fprintf(stderr, "GetEntity error: %s\n",
|
||||
cmp_s.ToString().c_str());
|
||||
is_consistent = false;
|
||||
}
|
||||
} else {
|
||||
assert(cmp_s.ok());
|
||||
|
||||
if (s.IsNotFound()) {
|
||||
fprintf(
|
||||
stderr,
|
||||
"Inconsistent results for key %s: MultiGetEntity returned "
|
||||
"not found, GetEntity returned ok\n",
|
||||
StringToHex(keys[i]).c_str());
|
||||
is_consistent = false;
|
||||
} else {
|
||||
assert(s.ok());
|
||||
|
||||
const WideColumns& cmp_columns = cmp_result.columns();
|
||||
|
||||
if (columns != cmp_columns) {
|
||||
} else if (cmp_s.IsNotFound()) {
|
||||
if (s.ok()) {
|
||||
fprintf(
|
||||
stderr,
|
||||
"Inconsistent results for key %s: MultiGetEntity returned "
|
||||
"%s, GetEntity returned %s\n",
|
||||
StringToHex(keys[i]).c_str(),
|
||||
WideColumnsToHex(columns).c_str(),
|
||||
WideColumnsToHex(cmp_columns).c_str());
|
||||
"ok, GetEntity returned not found\n",
|
||||
StringToHex(keys[i]).c_str());
|
||||
is_consistent = false;
|
||||
}
|
||||
} else {
|
||||
assert(cmp_s.ok());
|
||||
|
||||
if (s.IsNotFound()) {
|
||||
fprintf(
|
||||
stderr,
|
||||
"Inconsistent results for key %s: MultiGetEntity returned "
|
||||
"not found, GetEntity returned ok\n",
|
||||
StringToHex(keys[i]).c_str());
|
||||
is_consistent = false;
|
||||
} else {
|
||||
assert(s.ok());
|
||||
|
||||
const WideColumns& cmp_columns = cmp_result.columns();
|
||||
|
||||
if (columns != cmp_columns) {
|
||||
fprintf(stderr,
|
||||
"Inconsistent results for key %s: MultiGetEntity "
|
||||
"returned "
|
||||
"%s, GetEntity returned %s\n",
|
||||
StringToHex(keys[i]).c_str(),
|
||||
WideColumnsToHex(columns).c_str(),
|
||||
WideColumnsToHex(cmp_columns).c_str());
|
||||
is_consistent = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1186,9 +1212,91 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
}
|
||||
};
|
||||
|
||||
if (FLAGS_use_attribute_group) {
|
||||
if (FLAGS_use_txn) {
|
||||
// Transactional/read-your-own-writes MultiGetEntity verification
|
||||
std::vector<PinnableWideColumns> results(num_keys);
|
||||
std::vector<Status> statuses(num_keys);
|
||||
|
||||
assert(txn);
|
||||
txn->MultiGetEntity(read_opts_copy, cfh, num_keys, key_slices.data(),
|
||||
results.data(), statuses.data());
|
||||
|
||||
auto ryw_check = [&](const std::string& key, const WideColumns& columns,
|
||||
const Status& s) -> bool {
|
||||
const auto it = ryw_expected_values.find(key);
|
||||
if (it == ryw_expected_values.end()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const auto& ryw_expected_value = it->second;
|
||||
|
||||
if (s.ok()) {
|
||||
if (ryw_expected_value.IsDeleted()) {
|
||||
fprintf(
|
||||
stderr,
|
||||
"MultiGetEntity failed the read-your-own-write check for key "
|
||||
"%s\n",
|
||||
Slice(key).ToString(true).c_str());
|
||||
fprintf(stderr,
|
||||
"MultiGetEntity returned ok, transaction has non-committed "
|
||||
"delete\n");
|
||||
return false;
|
||||
} else {
|
||||
const uint32_t value_base = ryw_expected_value.GetValueBase();
|
||||
char expected_value[100];
|
||||
const size_t sz = GenerateValue(value_base, expected_value,
|
||||
sizeof(expected_value));
|
||||
const Slice expected_slice(expected_value, sz);
|
||||
const WideColumns expected_columns =
|
||||
GenerateExpectedWideColumns(value_base, expected_slice);
|
||||
|
||||
if (columns != expected_columns) {
|
||||
fprintf(
|
||||
stderr,
|
||||
"MultiGetEntity failed the read-your-own-write check for key "
|
||||
"%s\n",
|
||||
Slice(key).ToString(true).c_str());
|
||||
fprintf(stderr, "MultiGetEntity returned %s\n",
|
||||
WideColumnsToHex(columns).c_str());
|
||||
fprintf(stderr, "Transaction has non-committed write %s\n",
|
||||
WideColumnsToHex(expected_columns).c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
assert(s.IsNotFound());
|
||||
if (!ryw_expected_value.IsDeleted()) {
|
||||
fprintf(stderr,
|
||||
"MultiGetEntity failed the read-your-own-write check for key "
|
||||
"%s\n",
|
||||
Slice(key).ToString(true).c_str());
|
||||
fprintf(stderr,
|
||||
"MultiGetEntity returned not found, transaction has "
|
||||
"non-committed write\n");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
check_results([&](size_t i) { return results[i].columns(); },
|
||||
[&](size_t i) { return statuses[i]; }, ryw_check,
|
||||
[&](const Slice& key, PinnableWideColumns* result) {
|
||||
return txn->GetEntity(read_opts_copy, cfh, key, result);
|
||||
});
|
||||
|
||||
txn->Rollback().PermitUncheckedError();
|
||||
} else if (FLAGS_use_attribute_group) {
|
||||
// AttributeGroup MultiGetEntity verification
|
||||
|
||||
if (fault_fs_guard) {
|
||||
fault_fs_guard->EnableErrorInjection();
|
||||
SharedState::ignore_read_error = false;
|
||||
}
|
||||
|
||||
std::vector<PinnableAttributeGroups> results;
|
||||
results.reserve(num_keys);
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
|
@ -1209,10 +1317,20 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
|
||||
// Compare against non-attribute-group GetEntity result
|
||||
check_results([&](size_t i) { return results[i][0].columns(); },
|
||||
[&](size_t i) { return results[i][0].status(); });
|
||||
[&](size_t i) { return results[i][0].status(); },
|
||||
[](const Slice& /* key */, const WideColumns& /* columns */,
|
||||
const Status& /* s */) { return true; },
|
||||
[&](const Slice& key, PinnableWideColumns* result) {
|
||||
return db_->GetEntity(read_opts_copy, cfh, key, result);
|
||||
});
|
||||
} else {
|
||||
// Non-AttributeGroup MultiGetEntity verification
|
||||
|
||||
if (fault_fs_guard) {
|
||||
fault_fs_guard->EnableErrorInjection();
|
||||
SharedState::ignore_read_error = false;
|
||||
}
|
||||
|
||||
std::vector<PinnableWideColumns> results(num_keys);
|
||||
std::vector<Status> statuses(num_keys);
|
||||
|
||||
|
@ -1226,7 +1344,12 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
}
|
||||
|
||||
check_results([&](size_t i) { return results[i].columns(); },
|
||||
[&](size_t i) { return statuses[i]; });
|
||||
[&](size_t i) { return statuses[i]; },
|
||||
[](const Slice& /* key */, const WideColumns& /* columns */,
|
||||
const Status& /* s */) { return true; },
|
||||
[&](const Slice& key, PinnableWideColumns* result) {
|
||||
return db_->GetEntity(read_opts_copy, cfh, key, result);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue