Add AttributeGroupIterator to Stress Test (#12776)

Summary:
As title. Changes include the following
- `Refresh()` moved from `Iterator` interface to `IteratorBase` so that `AttributeGroupIterator` can also have Refresh() API (implemention will be added in the future PR)
- `TestIterate()`'s main logic refactored into `TestIterateImpl()` so that it can be shared with `TestIterateAttributeGroups()`
- `VerifyIterator()` also changed so that verification code can be shared between `Iterator` and `AttributeGroupIterator`

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12776

Test Plan:
Single CF Iterator
```
python3 tools/db_crashtest.py blackbox --simple --max_key=25000000 --write_buffer_size=4194304 --use_attribute_group=0 --use_put_entity_one_in=1 --use_multi_get=1 --use_multi_cf_iterator=0 --verify_iterator_with_expected_state_one_in=2
```

CoalescingIterator
```
python3 tools/db_crashtest.py blackbox --simple --max_key=25000000 --write_buffer_size=4194304 --use_attribute_group=0 --use_put_entity_one_in=1 --use_multi_get=1 --use_multi_cf_iterator=1 --verify_iterator_with_expected_state_one_in=2
```

AttributeGroupIterator
```
python3 tools/db_crashtest.py blackbox --simple --max_key=25000000 --write_buffer_size=4194304 --use_attribute_group=1 --use_put_entity_one_in=1 --use_multi_get=1 --use_multi_cf_iterator=1 --verify_iterator_with_expected_state_one_in=2
```

Reviewed By: cbi42

Differential Revision: D58626165

Pulled By: jaykorean

fbshipit-source-id: 3e0a6ff72e51ecef9e06b65acfa53605a24d742e
This commit is contained in:
Jay Huh 2024-06-17 11:25:30 -07:00 committed by Facebook GitHub Bot
parent 3758e31f3f
commit b8c9a2576a
6 changed files with 130 additions and 50 deletions

View File

@ -395,6 +395,16 @@ bool VerifyWideColumns(const WideColumns& columns) {
return VerifyWideColumns(value_of_default, columns);
}
bool VerifyIteratorAttributeGroups(
const IteratorAttributeGroups& attribute_groups) {
for (const auto& attribute_group : attribute_groups) {
if (!VerifyWideColumns(attribute_group.columns())) {
return false;
}
}
return true;
}
std::string GetNowNanos() {
uint64_t t = db_stress_env->NowNanos();
std::string ret;

View File

@ -764,6 +764,8 @@ WideColumns GenerateExpectedWideColumns(uint32_t value_base,
const Slice& slice);
bool VerifyWideColumns(const Slice& value, const WideColumns& columns);
bool VerifyWideColumns(const WideColumns& columns);
bool VerifyIteratorAttributeGroups(
const IteratorAttributeGroups& attribute_groups);
AttributeGroups GenerateAttributeGroups(
const std::vector<ColumnFamilyHandle*>& cfhs, uint32_t value_base,

View File

@ -1291,7 +1291,12 @@ void StressTest::OperateDb(ThreadState* thread) {
ThreadStatusUtil::SetEnableTracking(FLAGS_enable_thread_tracking);
ThreadStatusUtil::SetThreadOperation(
ThreadStatus::OperationType::OP_DBITERATOR);
if (FLAGS_use_multi_cf_iterator && FLAGS_use_attribute_group) {
TestIterateAttributeGroups(thread, read_opts, rand_column_families,
rand_keys);
} else {
TestIterate(thread, read_opts, rand_column_families, rand_keys);
}
ThreadStatusUtil::ResetThreadStatus();
}
} else {
@ -1375,6 +1380,75 @@ Status StressTest::TestIterate(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
auto new_iter_func = [&rand_column_families, this](const ReadOptions& ro) {
if (FLAGS_use_multi_cf_iterator) {
std::vector<ColumnFamilyHandle*> cfhs;
cfhs.reserve(rand_column_families.size());
for (auto cf_index : rand_column_families) {
cfhs.emplace_back(column_families_[cf_index]);
}
assert(!cfhs.empty());
return db_->NewCoalescingIterator(ro, cfhs);
} else {
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
assert(cfh);
return std::unique_ptr<Iterator>(db_->NewIterator(ro, cfh));
}
};
auto verify_func = [](Iterator* iter) {
if (!VerifyWideColumns(iter->value(), iter->columns())) {
fprintf(stderr,
"Value and columns inconsistent for iterator: value: %s, "
"columns: %s\n",
iter->value().ToString(/* hex */ true).c_str(),
WideColumnsToHex(iter->columns()).c_str());
return false;
}
return true;
};
return TestIterateImpl<Iterator>(thread, read_opts, rand_column_families,
rand_keys, new_iter_func, verify_func);
}
Status StressTest::TestIterateAttributeGroups(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
auto new_iter_func = [&rand_column_families, this](const ReadOptions& ro) {
assert(FLAGS_use_multi_cf_iterator);
std::vector<ColumnFamilyHandle*> cfhs;
cfhs.reserve(rand_column_families.size());
for (auto cf_index : rand_column_families) {
cfhs.emplace_back(column_families_[cf_index]);
}
assert(!cfhs.empty());
return db_->NewAttributeGroupIterator(ro, cfhs);
};
auto verify_func = [](AttributeGroupIterator* iter) {
if (!VerifyIteratorAttributeGroups(iter->attribute_groups())) {
// TODO - print out attribute group values
fprintf(stderr,
"one of the columns in the attribute groups inconsistent for "
"iterator\n");
return false;
}
return true;
};
return TestIterateImpl<AttributeGroupIterator>(
thread, read_opts, rand_column_families, rand_keys, new_iter_func,
verify_func);
}
template <typename IterType, typename NewIterFunc, typename VerifyFunc>
Status StressTest::TestIterateImpl(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
NewIterFunc new_iter_func,
VerifyFunc verify_func) {
assert(!rand_column_families.empty());
assert(!rand_keys.empty());
@ -1425,21 +1499,7 @@ Status StressTest::TestIterate(ThreadState* thread,
ro.iterate_lower_bound = &lower_bound;
}
std::unique_ptr<Iterator> iter;
if (FLAGS_use_multi_cf_iterator) {
std::vector<ColumnFamilyHandle*> cfhs;
cfhs.reserve(rand_column_families.size());
for (auto cf_index : rand_column_families) {
cfhs.emplace_back(column_families_[cf_index]);
}
assert(!cfhs.empty());
iter = db_->NewCoalescingIterator(ro, cfhs);
} else {
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
assert(cfh);
iter = std::unique_ptr<Iterator>(db_->NewIterator(ro, cfh));
}
std::unique_ptr<IterType> iter = new_iter_func(ro);
std::vector<std::string> key_strs;
if (thread->rand.OneIn(16)) {
@ -1541,7 +1601,7 @@ Status StressTest::TestIterate(ThreadState* thread,
}
VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op,
key, op_logs, &diverged);
key, op_logs, verify_func, &diverged);
const bool no_reverse =
(FLAGS_memtablerep == "prefix_hash" && !expect_total_order);
@ -1565,7 +1625,7 @@ Status StressTest::TestIterate(ThreadState* thread,
last_op = kLastOpNextOrPrev;
VerifyIterator(thread, cmp_cfh, ro, iter.get(), cmp_iter.get(), last_op,
key, op_logs, &diverged);
key, op_logs, verify_func, &diverged);
}
thread->stats.AddIterations(1);
@ -1622,12 +1682,11 @@ Status StressTest::VerifyGetCurrentWalFile() const {
// Will flag failure if the verification fails.
// diverged = true if the two iterator is already diverged.
// True if verification passed, false if not.
void StressTest::VerifyIterator(ThreadState* thread,
ColumnFamilyHandle* cmp_cfh,
const ReadOptions& ro, Iterator* iter,
Iterator* cmp_iter, LastIterateOp op,
const Slice& seek_key,
const std::string& op_logs, bool* diverged) {
template <typename IterType, typename VerifyFuncType>
void StressTest::VerifyIterator(
ThreadState* thread, ColumnFamilyHandle* cmp_cfh, const ReadOptions& ro,
IterType* iter, Iterator* cmp_iter, LastIterateOp op, const Slice& seek_key,
const std::string& op_logs, VerifyFuncType verify_func, bool* diverged) {
assert(diverged);
if (*diverged) {
@ -1783,17 +1842,10 @@ void StressTest::VerifyIterator(ThreadState* thread,
}
if (!*diverged && iter->Valid()) {
if (!VerifyWideColumns(iter->value(), iter->columns())) {
fprintf(stderr,
"Value and columns inconsistent for iterator: value: %s, "
"columns: %s\n",
iter->value().ToString(/* hex */ true).c_str(),
WideColumnsToHex(iter->columns()).c_str());
if (!verify_func(iter)) {
*diverged = true;
}
}
if (*diverged) {
fprintf(stderr, "VerifyIterator failed. Control CF %s\n",
cmp_cfh->GetName().c_str());

View File

@ -169,6 +169,20 @@ class StressTest {
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys);
// Given a key K, this creates an attribute group iterator which scans to K
// and then does a random sequence of Next/Prev operations. Called only when
// use_attribute_group=1
virtual Status TestIterateAttributeGroups(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys);
template <typename IterType, typename NewIterFunc, typename VerifyFunc>
Status TestIterateImpl(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
NewIterFunc new_iter_func, VerifyFunc verify_func);
virtual Status TestIterateAgainstExpected(
ThreadState* /* thread */, const ReadOptions& /* read_opts */,
const std::vector<int>& /* rand_column_families */,
@ -192,10 +206,12 @@ class StressTest {
// diverged = true if the two iterator is already diverged.
// True if verification passed, false if not.
// op_logs is the information to print when validation fails.
template <typename IterType, typename VerifyFuncType>
void VerifyIterator(ThreadState* thread, ColumnFamilyHandle* cmp_cfh,
const ReadOptions& ro, Iterator* iter, Iterator* cmp_iter,
const ReadOptions& ro, IterType* iter, Iterator* cmp_iter,
LastIterateOp op, const Slice& seek_key,
const std::string& op_logs, bool* diverged);
const std::string& op_logs, VerifyFuncType verifyFunc,
bool* diverged);
virtual Status TestBackupRestore(ThreadState* thread,
const std::vector<int>& rand_column_families,

View File

@ -55,22 +55,6 @@ class Iterator : public IteratorBase {
return kNoWideColumns;
}
// If supported, the DB state that the iterator reads from is updated to
// the latest state. The iterator will be invalidated after the call.
// Regardless of whether the iterator was created/refreshed previously
// with or without a snapshot, the iterator will be reading the
// latest DB state after this call.
// Note that you will need to call a Seek*() function to get the iterator
// back into a valid state before calling a function that assumes the
// state is already valid, like Next().
virtual Status Refresh() { return Refresh(nullptr); }
// Similar to Refresh() but the iterator will be reading the latest DB state
// under the given snapshot.
virtual Status Refresh(const class Snapshot*) {
return Status::NotSupported("Refresh() is not supported");
}
// Property "rocksdb.iterator.is-key-pinned":
// If returning "1", this means that the Slice returned by key() is valid
// as long as the iterator is not deleted.

View File

@ -58,6 +58,22 @@ class IteratorBase : public Cleanable {
// REQUIRES: Valid()
virtual void Prev() = 0;
// If supported, the DB state that the iterator reads from is updated to
// the latest state. The iterator will be invalidated after the call.
// Regardless of whether the iterator was created/refreshed previously
// with or without a snapshot, the iterator will be reading the
// latest DB state after this call.
// Note that you will need to call a Seek*() function to get the iterator
// back into a valid state before calling a function that assumes the
// state is already valid, like Next().
virtual Status Refresh() { return Refresh(nullptr); }
// Similar to Refresh() but the iterator will be reading the latest DB state
// under the given snapshot.
virtual Status Refresh(const class Snapshot*) {
return Status::NotSupported("Refresh() is not supported");
}
// Return the key for the current entry. The underlying storage for
// the returned slice is valid only until the next modification of the
// iterator (i.e. the next SeekToFirst/SeekToLast/Seek/SeekForPrev/Next/Prev