diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 5872892c37..174894c348 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -1044,27 +1044,48 @@ class NonBatchedOpsStressTest : public StressTest { read_opts_copy.snapshot = snapshot_guard.snapshot(); assert(!rand_column_families.empty()); - assert(rand_column_families[0] >= 0); - assert(rand_column_families[0] < static_cast(column_families_.size())); - ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]]; + const int column_family = rand_column_families[0]; + + assert(column_family >= 0); + assert(column_family < static_cast(column_families_.size())); + + ColumnFamilyHandle* const cfh = column_families_[column_family]; assert(cfh); assert(!rand_keys.empty()); const size_t num_keys = rand_keys.size(); - std::vector keys(num_keys); - std::vector key_slices(num_keys); + std::unique_ptr txn; - if (fault_fs_guard) { - fault_fs_guard->EnableErrorInjection(); - SharedState::ignore_read_error = false; + if (FLAGS_use_txn) { + WriteOptions write_options; + if (FLAGS_rate_limit_auto_wal_flush) { + write_options.rate_limiter_priority = Env::IO_USER; + } + + const Status s = NewTxn(write_options, &txn); + if (!s.ok()) { + fprintf(stderr, "NewTxn error: %s\n", s.ToString().c_str()); + thread->shared->SafeTerminate(); + } } + std::vector keys(num_keys); + std::vector key_slices(num_keys); + std::unordered_map ryw_expected_values; + for (size_t i = 0; i < num_keys; ++i) { - keys[i] = Key(rand_keys[i]); + const int64_t key = rand_keys[i]; + + keys[i] = Key(key); key_slices[i] = keys[i]; + + if (FLAGS_use_txn) { + MaybeAddKeyToTxnForRYW(thread, column_family, key, txn.get(), + ryw_expected_values); + } } int error_count = 0; @@ -1099,7 +1120,8 @@ class NonBatchedOpsStressTest : public StressTest { } }; - auto check_results = [&](auto get_columns, auto get_status) { + auto check_results = [&](auto get_columns, auto get_status, + auto do_extra_check, auto call_get_entity) { const bool check_get_entity = !error_count && FLAGS_check_multiget_entity_consistency; @@ -1116,50 +1138,54 @@ class NonBatchedOpsStressTest : public StressTest { "%s: %s\n", StringToHex(keys[i]).c_str(), WideColumnsToHex(columns).c_str()); is_consistent = false; - } else if (check_get_entity && (s.ok() || s.IsNotFound())) { - PinnableWideColumns cmp_result; - ThreadStatusUtil::SetThreadOperation( - ThreadStatus::OperationType::OP_GETENTITY); - const Status cmp_s = - db_->GetEntity(read_opts_copy, cfh, key_slices[i], &cmp_result); - - if (!cmp_s.ok() && !cmp_s.IsNotFound()) { - fprintf(stderr, "GetEntity error: %s\n", cmp_s.ToString().c_str()); + } else if (s.ok() || s.IsNotFound()) { + if (!do_extra_check(keys[i], columns, s)) { is_consistent = false; - } else if (cmp_s.IsNotFound()) { - if (s.ok()) { - fprintf( - stderr, - "Inconsistent results for key %s: MultiGetEntity returned " - "ok, GetEntity returned not found\n", - StringToHex(keys[i]).c_str()); + } else if (check_get_entity) { + PinnableWideColumns cmp_result; + ThreadStatusUtil::SetThreadOperation( + ThreadStatus::OperationType::OP_GETENTITY); + const Status cmp_s = call_get_entity(key_slices[i], &cmp_result); + + if (!cmp_s.ok() && !cmp_s.IsNotFound()) { + fprintf(stderr, "GetEntity error: %s\n", + cmp_s.ToString().c_str()); is_consistent = false; - } - } else { - assert(cmp_s.ok()); - - if (s.IsNotFound()) { - fprintf( - stderr, - "Inconsistent results for key %s: MultiGetEntity returned " - "not found, GetEntity returned ok\n", - StringToHex(keys[i]).c_str()); - is_consistent = false; - } else { - assert(s.ok()); - - const WideColumns& cmp_columns = cmp_result.columns(); - - if (columns != cmp_columns) { + } else if (cmp_s.IsNotFound()) { + if (s.ok()) { fprintf( stderr, "Inconsistent results for key %s: MultiGetEntity returned " - "%s, GetEntity returned %s\n", - StringToHex(keys[i]).c_str(), - WideColumnsToHex(columns).c_str(), - WideColumnsToHex(cmp_columns).c_str()); + "ok, GetEntity returned not found\n", + StringToHex(keys[i]).c_str()); is_consistent = false; } + } else { + assert(cmp_s.ok()); + + if (s.IsNotFound()) { + fprintf( + stderr, + "Inconsistent results for key %s: MultiGetEntity returned " + "not found, GetEntity returned ok\n", + StringToHex(keys[i]).c_str()); + is_consistent = false; + } else { + assert(s.ok()); + + const WideColumns& cmp_columns = cmp_result.columns(); + + if (columns != cmp_columns) { + fprintf(stderr, + "Inconsistent results for key %s: MultiGetEntity " + "returned " + "%s, GetEntity returned %s\n", + StringToHex(keys[i]).c_str(), + WideColumnsToHex(columns).c_str(), + WideColumnsToHex(cmp_columns).c_str()); + is_consistent = false; + } + } } } } @@ -1186,9 +1212,91 @@ class NonBatchedOpsStressTest : public StressTest { } }; - if (FLAGS_use_attribute_group) { + if (FLAGS_use_txn) { + // Transactional/read-your-own-writes MultiGetEntity verification + std::vector results(num_keys); + std::vector statuses(num_keys); + + assert(txn); + txn->MultiGetEntity(read_opts_copy, cfh, num_keys, key_slices.data(), + results.data(), statuses.data()); + + auto ryw_check = [&](const std::string& key, const WideColumns& columns, + const Status& s) -> bool { + const auto it = ryw_expected_values.find(key); + if (it == ryw_expected_values.end()) { + return true; + } + + const auto& ryw_expected_value = it->second; + + if (s.ok()) { + if (ryw_expected_value.IsDeleted()) { + fprintf( + stderr, + "MultiGetEntity failed the read-your-own-write check for key " + "%s\n", + Slice(key).ToString(true).c_str()); + fprintf(stderr, + "MultiGetEntity returned ok, transaction has non-committed " + "delete\n"); + return false; + } else { + const uint32_t value_base = ryw_expected_value.GetValueBase(); + char expected_value[100]; + const size_t sz = GenerateValue(value_base, expected_value, + sizeof(expected_value)); + const Slice expected_slice(expected_value, sz); + const WideColumns expected_columns = + GenerateExpectedWideColumns(value_base, expected_slice); + + if (columns != expected_columns) { + fprintf( + stderr, + "MultiGetEntity failed the read-your-own-write check for key " + "%s\n", + Slice(key).ToString(true).c_str()); + fprintf(stderr, "MultiGetEntity returned %s\n", + WideColumnsToHex(columns).c_str()); + fprintf(stderr, "Transaction has non-committed write %s\n", + WideColumnsToHex(expected_columns).c_str()); + return false; + } + + return true; + } + } + + assert(s.IsNotFound()); + if (!ryw_expected_value.IsDeleted()) { + fprintf(stderr, + "MultiGetEntity failed the read-your-own-write check for key " + "%s\n", + Slice(key).ToString(true).c_str()); + fprintf(stderr, + "MultiGetEntity returned not found, transaction has " + "non-committed write\n"); + return false; + } + + return true; + }; + + check_results([&](size_t i) { return results[i].columns(); }, + [&](size_t i) { return statuses[i]; }, ryw_check, + [&](const Slice& key, PinnableWideColumns* result) { + return txn->GetEntity(read_opts_copy, cfh, key, result); + }); + + txn->Rollback().PermitUncheckedError(); + } else if (FLAGS_use_attribute_group) { // AttributeGroup MultiGetEntity verification + if (fault_fs_guard) { + fault_fs_guard->EnableErrorInjection(); + SharedState::ignore_read_error = false; + } + std::vector results; results.reserve(num_keys); for (size_t i = 0; i < num_keys; ++i) { @@ -1209,10 +1317,20 @@ class NonBatchedOpsStressTest : public StressTest { // Compare against non-attribute-group GetEntity result check_results([&](size_t i) { return results[i][0].columns(); }, - [&](size_t i) { return results[i][0].status(); }); + [&](size_t i) { return results[i][0].status(); }, + [](const Slice& /* key */, const WideColumns& /* columns */, + const Status& /* s */) { return true; }, + [&](const Slice& key, PinnableWideColumns* result) { + return db_->GetEntity(read_opts_copy, cfh, key, result); + }); } else { // Non-AttributeGroup MultiGetEntity verification + if (fault_fs_guard) { + fault_fs_guard->EnableErrorInjection(); + SharedState::ignore_read_error = false; + } + std::vector results(num_keys); std::vector statuses(num_keys); @@ -1226,7 +1344,12 @@ class NonBatchedOpsStressTest : public StressTest { } check_results([&](size_t i) { return results[i].columns(); }, - [&](size_t i) { return statuses[i]; }); + [&](size_t i) { return statuses[i]; }, + [](const Slice& /* key */, const WideColumns& /* columns */, + const Status& /* s */) { return true; }, + [&](const Slice& key, PinnableWideColumns* result) { + return db_->GetEntity(read_opts_copy, cfh, key, result); + }); } }