mirror of https://github.com/facebook/rocksdb.git
Factor out the RYW transaction building logic into a helper (#12697)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/12697 As groundwork for stress testing `Transaction::MultiGetEntity`, the patch factors out the logic for adding transactional writes for some of the keys in a `MultiGet` batch into a separate helper method called `MaybeAddKeyToTxnForRYW`. Reviewed By: jowlyzhang Differential Revision: D57791830 fbshipit-source-id: ef347ba6e6e82dfe5cedb4cf67dd6d1503901d89
This commit is contained in:
parent
fecb10c2fa
commit
bd801bd98c
|
@ -613,6 +613,7 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
std::unordered_map<std::string, ExpectedValue> ryw_expected_values;
|
||||
|
||||
SharedState* shared = thread->shared;
|
||||
assert(shared);
|
||||
|
||||
int column_family = rand_column_families[0];
|
||||
ColumnFamilyHandle* cfh = column_families_[column_family];
|
||||
|
@ -649,7 +650,7 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
Status s = NewTxn(wo, &txn);
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "NewTxn error: %s\n", s.ToString().c_str());
|
||||
thread->shared->SafeTerminate();
|
||||
shared->SafeTerminate();
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < num_keys; ++i) {
|
||||
|
@ -657,61 +658,8 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
key_str.emplace_back(Key(rand_key));
|
||||
keys.emplace_back(key_str.back());
|
||||
if (use_txn) {
|
||||
if (!shared->AllowsOverwrite(rand_key) &&
|
||||
shared->Exists(column_family, rand_key)) {
|
||||
// Just do read your write checks for keys that allow overwrites.
|
||||
continue;
|
||||
}
|
||||
// With a 1 in 10 probability, insert the just added key in the batch
|
||||
// into the transaction. This will create an overlap with the MultiGet
|
||||
// keys and exercise some corner cases in the code
|
||||
if (thread->rand.OneIn(10)) {
|
||||
enum class Op {
|
||||
Put,
|
||||
Merge,
|
||||
Delete,
|
||||
// add new operations above this line
|
||||
NumberOfOps
|
||||
};
|
||||
|
||||
const Op op = static_cast<Op>(
|
||||
thread->rand.Uniform(static_cast<int>(Op::NumberOfOps)));
|
||||
|
||||
Status s;
|
||||
assert(txn);
|
||||
switch (op) {
|
||||
case Op::Put:
|
||||
case Op::Merge: {
|
||||
ExpectedValue put_value;
|
||||
put_value.Put(false /* pending */);
|
||||
ryw_expected_values[key_str[i]] = put_value;
|
||||
char value[100];
|
||||
size_t sz =
|
||||
GenerateValue(put_value.GetValueBase(), value, sizeof(value));
|
||||
Slice v(value, sz);
|
||||
if (op == Op::Put) {
|
||||
s = txn->Put(cfh, keys.back(), v);
|
||||
} else {
|
||||
s = txn->Merge(cfh, keys.back(), v);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Op::Delete: {
|
||||
ExpectedValue delete_value;
|
||||
delete_value.Delete(false /* pending */);
|
||||
ryw_expected_values[key_str[i]] = delete_value;
|
||||
s = txn->Delete(cfh, keys.back());
|
||||
break;
|
||||
}
|
||||
default:
|
||||
assert(false);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "Transaction write error in TestMultiGet: %s\n",
|
||||
s.ToString().c_str());
|
||||
thread->shared->SafeTerminate();
|
||||
}
|
||||
}
|
||||
MaybeAddKeyToTxnForRYW(thread, column_family, rand_key, txn.get(),
|
||||
ryw_expected_values);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -742,7 +690,7 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
if (stat_nok < error_count) {
|
||||
// Grab mutex so multiple thread don't try to print the
|
||||
// stack trace at the same time
|
||||
MutexLock l(thread->shared->GetMutex());
|
||||
MutexLock l(shared->GetMutex());
|
||||
fprintf(stderr, "Didn't get expected error from MultiGet. \n");
|
||||
fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n", num_keys,
|
||||
error_count, stat_nok);
|
||||
|
@ -877,13 +825,13 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
|
||||
thread->stats.AddErrors(1);
|
||||
// Fail fast to preserve the DB state
|
||||
thread->shared->SetVerificationFailure();
|
||||
shared->SetVerificationFailure();
|
||||
return false;
|
||||
} else if (!is_ryw_correct) {
|
||||
fprintf(stderr, "TestMultiGet error: is_ryw_correct is false\n");
|
||||
thread->stats.AddErrors(1);
|
||||
// Fail fast to preserve the DB state
|
||||
thread->shared->SetVerificationFailure();
|
||||
shared->SetVerificationFailure();
|
||||
return false;
|
||||
} else if (s.ok()) {
|
||||
// found case
|
||||
|
@ -2390,6 +2338,83 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
return !shared->AllowsOverwrite(key_num);
|
||||
};
|
||||
}
|
||||
|
||||
void MaybeAddKeyToTxnForRYW(
|
||||
ThreadState* thread, int column_family, int64_t key, Transaction* txn,
|
||||
std::unordered_map<std::string, ExpectedValue>& ryw_expected_values) {
|
||||
assert(thread);
|
||||
assert(txn);
|
||||
|
||||
SharedState* const shared = thread->shared;
|
||||
assert(shared);
|
||||
|
||||
if (!shared->AllowsOverwrite(key) && shared->Exists(column_family, key)) {
|
||||
// Just do read your write checks for keys that allow overwrites.
|
||||
return;
|
||||
}
|
||||
|
||||
// With a 1 in 10 probability, insert the just added key in the batch
|
||||
// into the transaction. This will create an overlap with the MultiGet
|
||||
// keys and exercise some corner cases in the code
|
||||
if (thread->rand.OneIn(10)) {
|
||||
ColumnFamilyHandle* const cfh = column_families_[column_family];
|
||||
assert(cfh);
|
||||
|
||||
const std::string k = Key(key);
|
||||
|
||||
enum class Op {
|
||||
Put,
|
||||
Merge,
|
||||
Delete,
|
||||
// add new operations above this line
|
||||
NumberOfOps
|
||||
};
|
||||
|
||||
const Op op = static_cast<Op>(
|
||||
thread->rand.Uniform(static_cast<int>(Op::NumberOfOps)));
|
||||
|
||||
Status s;
|
||||
|
||||
switch (op) {
|
||||
case Op::Put:
|
||||
case Op::Merge: {
|
||||
ExpectedValue put_value;
|
||||
put_value.Put(false /* pending */);
|
||||
ryw_expected_values[k] = put_value;
|
||||
|
||||
char value[100];
|
||||
size_t sz =
|
||||
GenerateValue(put_value.GetValueBase(), value, sizeof(value));
|
||||
const Slice v(value, sz);
|
||||
|
||||
if (op == Op::Put) {
|
||||
s = txn->Put(cfh, k, v);
|
||||
} else {
|
||||
s = txn->Merge(cfh, k, v);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case Op::Delete: {
|
||||
ExpectedValue delete_value;
|
||||
delete_value.Delete(false /* pending */);
|
||||
ryw_expected_values[k] = delete_value;
|
||||
|
||||
s = txn->Delete(cfh, k);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
assert(false);
|
||||
}
|
||||
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr,
|
||||
"Transaction write error in read-your-own-write test: %s\n",
|
||||
s.ToString().c_str());
|
||||
shared->SafeTerminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
StressTest* CreateNonBatchedOpsStressTest() {
|
||||
|
|
Loading…
Reference in New Issue