MultiCFSnapshot for NewIterators() API (#12573)

Summary:
As mentioned in https://github.com/facebook/rocksdb/issues/12561 and https://github.com/facebook/rocksdb/issues/12566 , `NewIterators()` API has not been providing consistent view of the db across multiple column families. This PR addresses it by utilizing `MultiCFSnapshot()` function which has been used for `MultiGet()` APIs. To be able to obtain the thread-local super version with ref, `sv_exclusive_access` parameter has been added to `MultiCFSnapshot()` so that we could call `GetReferencedSuperVersion()` or `GetAndRefSuperVersion()` depending on the param and support `Refresh()` API for MultiCfIterators

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12573

Test Plan:
**Unit Tests Added**

```
./db_iterator_test --gtest_filter="*IteratorsConsistentView*"
```
```
./multi_cf_iterator_test -- --gtest_filter="*ConsistentView*"
```

**Performance Check**

Setup
```
make -j64 release
TEST_TMPDIR=/dev/shm/db_bench ./db_bench -benchmarks="filluniquerandom" -key_size=32 -value_size=512 -num=10000000 -compression_type=none
```

Run
```
TEST_TMPDIR=/dev/shm/db_bench ./db_bench -use_existing_db=1 -benchmarks="multireadrandom" -cache_size=10485760000
```
Before the change
```
DB path: [/dev/shm/db_bench/dbbench]
multireadrandom :       6.374 micros/op 156892 ops/sec 6.374 seconds 1000000 operations; (0 of 1000000 found)
```
After the change
```
DB path: [/dev/shm/db_bench/dbbench]
multireadrandom :       6.265 micros/op 159627 ops/sec 6.265 seconds 1000000 operations; (0 of 1000000 found)
```

Reviewed By: jowlyzhang

Differential Revision: D56444066

Pulled By: jaykorean

fbshipit-source-id: 327ce73c072da30c221e18d4f3389f49115b8f99
This commit is contained in:
Jay Huh 2024-04-24 15:28:55 -07:00 committed by Facebook GitHub Bot
parent 6807da0b44
commit 1fca175eec
6 changed files with 338 additions and 103 deletions

View File

@ -1413,7 +1413,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (++get_sv_count == 2) {
// After MultiGet refs a couple of CFs, flush all CFs so MultiGet
// is forced to repeat the process
@ -1513,9 +1513,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
int retries = 0;
bool last_try = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::LastTry", [&](void* /*arg*/) { last_try = true; });
"DBImpl::MultiCFSnapshot::LastTry",
[&](void* /*arg*/) { last_try = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (last_try) {
return;
}
@ -1531,10 +1532,10 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::MultiGet::AfterLastTryRefSV",
{"DBImpl::MultiCFSnapshot::AfterLastTryRefSV",
"DBMultiGetTestWithParam::MultiGetMultiCFMutex:BeforeCreateSV"},
{"DBMultiGetTestWithParam::MultiGetMultiCFMutex:AfterCreateSV",
"DBImpl::MultiGet::BeforeLastTryUnRefSV"},
"DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -1600,7 +1601,7 @@ TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
int get_sv_count = 0;
ROCKSDB_NAMESPACE::DBImpl* db = static_cast_with_check<DBImpl>(db_);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (++get_sv_count == 2) {
for (int i = 0; i < 8; ++i) {
ASSERT_OK(Flush(i));

View File

@ -2521,7 +2521,7 @@ template <class T, typename IterDerefFuncType>
Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
SequenceNumber* snapshot,
bool extra_sv_ref, SequenceNumber* snapshot,
bool* sv_from_thread_local) {
PERF_TIMER_GUARD(get_snapshot_time);
@ -2539,7 +2539,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
SuperVersion* super_version = node->super_version;
ColumnFamilyData* cfd = node->cfd;
if (super_version != nullptr) {
if (*sv_from_thread_local) {
if (*sv_from_thread_local && !extra_sv_ref) {
ReturnAndCleanupSuperVersion(cfd, super_version);
} else {
CleanupSuperVersion(super_version);
@ -2555,7 +2555,11 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
// super version
auto cf_iter = cf_list->begin();
auto node = iter_deref_func(cf_iter);
node->super_version = GetAndRefSuperVersion(node->cfd);
if (extra_sv_ref) {
node->super_version = node->cfd->GetReferencedSuperVersion(this);
} else {
node->super_version = GetAndRefSuperVersion(node->cfd);
}
if (check_read_ts) {
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
*(read_options.timestamp));
@ -2602,7 +2606,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
}
if (read_options.snapshot == nullptr) {
if (last_try) {
TEST_SYNC_POINT("DBImpl::MultiGet::LastTry");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::LastTry");
// We're close to max number of retries. For the last retry,
// acquire the lock so we're sure to succeed
mutex_.Lock();
@ -2617,11 +2621,15 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
++cf_iter) {
auto node = iter_deref_func(cf_iter);
if (!last_try) {
node->super_version = GetAndRefSuperVersion(node->cfd);
if (extra_sv_ref) {
node->super_version = node->cfd->GetReferencedSuperVersion(this);
} else {
node->super_version = GetAndRefSuperVersion(node->cfd);
}
} else {
node->super_version = node->cfd->GetSuperVersion()->Ref();
}
TEST_SYNC_POINT("DBImpl::MultiGet::AfterRefSV");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterRefSV");
if (check_read_ts) {
s = FailIfReadCollapsedHistory(node->cfd, node->super_version,
*(read_options.timestamp));
@ -2658,7 +2666,7 @@ Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
if (!retry) {
if (last_try) {
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::MultiGet::AfterLastTryRefSV");
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::AfterLastTryRefSV");
}
break;
}
@ -2772,8 +2780,8 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
autovector<MultiGetKeyRangePerCf, MultiGetContext::MAX_BATCH_SIZE>
key_range_per_cf;
autovector<ColumnFamilyDataSuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cfd_sv_pairs;
autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cf_sv_pairs;
size_t cf_start = 0;
ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
@ -2781,25 +2789,26 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
KeyContext* key_ctx = sorted_keys[i];
if (key_ctx->column_family != cf) {
key_range_per_cf.emplace_back(cf_start, i - cf_start);
cfd_sv_pairs.emplace_back(cf, nullptr);
cf_sv_pairs.emplace_back(cf, nullptr);
cf_start = i;
cf = key_ctx->column_family;
}
}
key_range_per_cf.emplace_back(cf_start, num_keys - cf_start);
cfd_sv_pairs.emplace_back(cf, nullptr);
cf_sv_pairs.emplace_back(cf, nullptr);
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local;
Status s = MultiCFSnapshot<autovector<ColumnFamilyDataSuperVersionPair,
bool sv_from_thread_local = false;
Status s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr,
[](autovector<ColumnFamilyDataSuperVersionPair,
[](autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
&cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local);
&cf_sv_pairs,
/* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
for (size_t i = 0; i < num_keys; ++i) {
@ -2817,20 +2826,20 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
read_callback = &timestamp_read_callback;
}
assert(key_range_per_cf.size() == cfd_sv_pairs.size());
assert(key_range_per_cf.size() == cf_sv_pairs.size());
auto key_range_per_cf_iter = key_range_per_cf.begin();
auto cfd_sv_pair_iter = cfd_sv_pairs.begin();
auto cf_sv_pair_iter = cf_sv_pairs.begin();
while (key_range_per_cf_iter != key_range_per_cf.end() &&
cfd_sv_pair_iter != cfd_sv_pairs.end()) {
cf_sv_pair_iter != cf_sv_pairs.end()) {
s = MultiGetImpl(read_options, key_range_per_cf_iter->start,
key_range_per_cf_iter->num_keys, &sorted_keys,
cfd_sv_pair_iter->super_version, consistent_seqnum,
cf_sv_pair_iter->super_version, consistent_seqnum,
read_callback);
if (!s.ok()) {
break;
}
++key_range_per_cf_iter;
++cfd_sv_pair_iter;
++cf_sv_pair_iter;
}
if (!s.ok()) {
assert(s.IsTimedOut() || s.IsAborted());
@ -2845,12 +2854,12 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
}
}
for (const auto& cfd_sv_pair : cfd_sv_pairs) {
for (const auto& cf_sv_pair : cf_sv_pairs) {
if (sv_from_thread_local) {
ReturnAndCleanupSuperVersion(cfd_sv_pair.cfd, cfd_sv_pair.super_version);
ReturnAndCleanupSuperVersion(cf_sv_pair.cfd, cf_sv_pair.super_version);
} else {
TEST_SYNC_POINT("DBImpl::MultiGet::BeforeLastTryUnRefSV");
CleanupSuperVersion(cfd_sv_pair.super_version);
TEST_SYNC_POINT("DBImpl::MultiCFSnapshot::BeforeLastTryUnRefSV");
CleanupSuperVersion(cf_sv_pair.super_version);
}
}
}
@ -2982,17 +2991,18 @@ void DBImpl::MultiGetWithCallbackImpl(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
std::array<ColumnFamilyDataSuperVersionPair, 1> cfd_sv_pairs;
cfd_sv_pairs[0] = ColumnFamilyDataSuperVersionPair(column_family, nullptr);
std::array<ColumnFamilySuperVersionPair, 1> cf_sv_pairs;
cf_sv_pairs[0] = ColumnFamilySuperVersionPair(column_family, nullptr);
size_t num_keys = sorted_keys->size();
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local;
Status s = MultiCFSnapshot<std::array<ColumnFamilyDataSuperVersionPair, 1>>(
bool sv_from_thread_local = false;
Status s = MultiCFSnapshot<std::array<ColumnFamilySuperVersionPair, 1>>(
read_options, callback,
[](std::array<ColumnFamilyDataSuperVersionPair, 1>::iterator& cf_iter) {
[](std::array<ColumnFamilySuperVersionPair, 1>::iterator& cf_iter) {
return &(*cf_iter);
},
&cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local);
&cf_sv_pairs,
/* extra_sv_ref */ false, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
return;
}
@ -3031,11 +3041,11 @@ void DBImpl::MultiGetWithCallbackImpl(
}
s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
cfd_sv_pairs[0].super_version, consistent_seqnum,
cf_sv_pairs[0].super_version, consistent_seqnum,
read_callback);
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
ReturnAndCleanupSuperVersion(cfd_sv_pairs[0].cfd,
cfd_sv_pairs[0].super_version);
ReturnAndCleanupSuperVersion(cf_sv_pairs[0].cfd,
cf_sv_pairs[0].super_version);
}
// The actual implementation of batched MultiGet. Parameters -
@ -3817,69 +3827,62 @@ Status DBImpl::NewIterators(
"ReadTier::kPersistedData is not yet supported in iterators.");
}
if (read_options.timestamp) {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
if (!s.ok()) {
return s;
}
}
} else {
for (auto* cf : column_families) {
assert(cf);
const Status s = FailIfCfHasTs(cf);
if (!s.ok()) {
return s;
}
}
}
autovector<ColumnFamilySuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cf_sv_pairs;
Status s;
for (auto* cf : column_families) {
assert(cf);
if (read_options.timestamp) {
s = FailIfTsMismatchCf(cf, *(read_options.timestamp));
} else {
s = FailIfCfHasTs(cf);
}
if (!s.ok()) {
return s;
}
cf_sv_pairs.emplace_back(cf, nullptr);
}
iterators->clear();
iterators->reserve(column_families.size());
autovector<std::tuple<ColumnFamilyHandleImpl*, SuperVersion*>> cfh_to_sv;
const bool check_read_ts =
read_options.timestamp && read_options.timestamp->size() > 0;
for (auto cf : column_families) {
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(cf);
auto cfd = cfh->cfd();
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
cfh_to_sv.emplace_back(cfh, sv);
if (check_read_ts) {
const Status s =
FailIfReadCollapsedHistory(cfd, sv, *(read_options.timestamp));
if (!s.ok()) {
for (auto prev_entry : cfh_to_sv) {
CleanupSuperVersion(std::get<1>(prev_entry));
}
return s;
}
}
}
assert(cfh_to_sv.size() == column_families.size());
if (read_options.tailing) {
for (auto [cfh, sv] : cfh_to_sv) {
auto iter = new ForwardIterator(this, read_options, cfh->cfd(), sv,
/* allow_unprepared_value */ true);
iterators->push_back(NewDBIterator(
env_, read_options, *cfh->cfd()->ioptions(), sv->mutable_cf_options,
cfh->cfd()->user_comparator(), iter, sv->current, kMaxSequenceNumber,
sv->mutable_cf_options.max_sequential_skip_in_iterations,
nullptr /*read_callback*/, cfh));
}
} else {
// Note: no need to consider the special case of
// last_seq_same_as_publish_seq_==false since NewIterators is overridden
// in WritePreparedTxnDB
auto snapshot = read_options.snapshot != nullptr
? read_options.snapshot->GetSequenceNumber()
: versions_->LastSequence();
for (auto [cfh, sv] : cfh_to_sv) {
iterators->push_back(NewIteratorImpl(read_options, cfh, sv, snapshot,
nullptr /*read_callback*/));
}
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local = false;
s = MultiCFSnapshot<autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr /* read_callback*/,
[](autovector<ColumnFamilySuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
&cf_sv_pairs,
/* extra_sv_ref */ true, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
return s;
}
assert(cf_sv_pairs.size() == column_families.size());
if (read_options.tailing) {
for (const auto& cf_sv_pair : cf_sv_pairs) {
auto iter = new ForwardIterator(this, read_options, cf_sv_pair.cfd,
cf_sv_pair.super_version,
/* allow_unprepared_value */ true);
iterators->push_back(
NewDBIterator(env_, read_options, *cf_sv_pair.cfd->ioptions(),
cf_sv_pair.super_version->mutable_cf_options,
cf_sv_pair.cfd->user_comparator(), iter,
cf_sv_pair.super_version->current, kMaxSequenceNumber,
cf_sv_pair.super_version->mutable_cf_options
.max_sequential_skip_in_iterations,
nullptr /*read_callback*/, cf_sv_pair.cfh));
}
} else {
for (const auto& cf_sv_pair : cf_sv_pairs) {
iterators->push_back(NewIteratorImpl(
read_options, cf_sv_pair.cfh, cf_sv_pair.super_version,
consistent_seqnum, nullptr /*read_callback*/));
}
}
return Status::OK();
}

View File

@ -2355,18 +2355,20 @@ class DBImpl : public DB {
// A structure to contain ColumnFamilyData and the SuperVersion obtained for
// the consistent view of DB
struct ColumnFamilyDataSuperVersionPair {
struct ColumnFamilySuperVersionPair {
ColumnFamilyHandleImpl* cfh;
ColumnFamilyData* cfd;
// SuperVersion for the column family obtained in a manner that ensures a
// consistent view across all column families in the DB
SuperVersion* super_version;
ColumnFamilyDataSuperVersionPair(ColumnFamilyHandle* column_family,
SuperVersion* sv)
: cfd(static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd()),
ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family,
SuperVersion* sv)
: cfh(static_cast<ColumnFamilyHandleImpl*>(column_family)),
cfd(cfh->cfd()),
super_version(sv) {}
ColumnFamilyDataSuperVersionPair() = default;
ColumnFamilySuperVersionPair() = default;
};
// A common function to obtain a consistent snapshot, which can be implicit
@ -2380,9 +2382,17 @@ class DBImpl : public DB {
// If callback is non-null, the callback is refreshed with the snapshot
// sequence number
//
// `extra_sv_ref` is used to indicate whether thread-local SuperVersion
// should be obtained with an extra ref (by GetReferencedSuperVersion()) or
// not (by GetAndRefSuperVersion()). For instance, point lookup like MultiGet
// does not require SuperVersion to be re-acquired throughout the entire
// invocation (no need extra ref), while MultiCfIterators may need the
// SuperVersion to be updated during Refresh() (requires extra ref).
//
// `sv_from_thread_local` being set to false indicates that the SuperVersion
// obtained from the ColumnFamilyData, whereas true indicates they are thread
// local.
//
// A non-OK status will be returned if for a column family that enables
// user-defined timestamp feature, the specified `ReadOptions.timestamp`
// attemps to read collapsed history.
@ -2390,7 +2400,8 @@ class DBImpl : public DB {
Status MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
SequenceNumber* snapshot, bool* sv_from_thread_local);
bool extra_sv_ref, SequenceNumber* snapshot,
bool* sv_from_thread_local);
// The actual implementation of the batching MultiGet. The caller is expected
// to have acquired the SuperVersion and pass in a snapshot sequence number

View File

@ -3555,6 +3555,119 @@ TEST_F(DBIteratorTest, ErrorWhenReadFile) {
iter->Reset();
}
TEST_F(DBIteratorTest, IteratorsConsistentViewImplicitSnapshot) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2"}, options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Flush(i));
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ReadOptions read_options;
std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(read_options, handles_, &iters));
for (int i = 0; i < 3; ++i) {
auto iter = iters[i];
ASSERT_OK(iter->status());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" +
std::to_string(i) + "_val_new");
}
for (auto* iter : iters) {
delete iter;
}
// Thread-local SVs are no longer obsolete nor in use
for (int i = 0; i < 3; ++i) {
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
}
}
TEST_F(DBIteratorTest, IteratorsConsistentViewExplicitSnapshot) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2"}, options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Flush(i));
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Explicit snapshot wouldn't force reloading all svs. We should expect old
// values
const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions read_options;
read_options.snapshot = snapshot;
std::vector<Iterator*> iters;
ASSERT_OK(db_->NewIterators(read_options, handles_, &iters));
for (int i = 0; i < 3; ++i) {
auto iter = iters[i];
ASSERT_OK(iter->status());
iter->SeekToFirst();
ASSERT_EQ(IterStatus(iter), "cf" + std::to_string(i) + "_key->cf" +
std::to_string(i) + "_val");
}
db_->ReleaseSnapshot(snapshot);
for (auto* iter : iters) {
delete iter;
}
// Thread-local SV for cf_0 is obsolete (flush happened after the first SV
// Ref)
auto* cfd0 =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[0])->cfd();
ASSERT_EQ(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
ASSERT_NE(cfd0->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
// Rest are not InUse nor Obsolete
for (int i = 1; i < 3; ++i) {
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[i])->cfd();
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -399,6 +399,112 @@ TEST_F(CoalescingIteratorTest, LowerAndUpperBounds) {
}
}
TEST_F(CoalescingIteratorTest, ConsistentViewExplicitSnapshot) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Flush(i));
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
ReadOptions read_options;
const Snapshot* snapshot = db_->GetSnapshot();
read_options.snapshot = snapshot;
// Verify Seek()
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3);
iter->Seek("");
ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val");
}
// Verify SeekForPrev()
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(read_options, cfhs_order_0_1_2_3);
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("cf2_key");
ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val");
iter->Prev();
ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val");
}
db_->ReleaseSnapshot(snapshot);
}
TEST_F(CoalescingIteratorTest, ConsistentViewImplicitSnapshot) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"cf_1", "cf_2", "cf_3"}, options);
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val"));
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"DBImpl::MultiCFSnapshot::AfterGetSeqNum1"}});
bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::MultiCFSnapshot::AfterRefSV", [&](void* /*arg*/) {
if (!flushed) {
for (int i = 0; i < 4; ++i) {
ASSERT_OK(Flush(i));
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
"cf" + std::to_string(i) + "_val_new"));
}
flushed = true;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
std::vector<ColumnFamilyHandle*> cfhs_order_0_1_2_3 = {
handles_[0], handles_[1], handles_[2], handles_[3]};
// Verify Seek()
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3);
iter->Seek("cf2_key");
ASSERT_EQ(IterStatus(iter.get()), "cf2_key->cf2_val_new");
iter->Next();
ASSERT_EQ(IterStatus(iter.get()), "cf3_key->cf3_val_new");
}
// Verify SeekForPrev()
{
std::unique_ptr<Iterator> iter =
db_->NewCoalescingIterator(ReadOptions(), cfhs_order_0_1_2_3);
iter->SeekForPrev("");
ASSERT_EQ(IterStatus(iter.get()), "(invalid)");
iter->SeekForPrev("cf1_key");
ASSERT_EQ(IterStatus(iter.get()), "cf1_key->cf1_val_new");
iter->Prev();
ASSERT_EQ(IterStatus(iter.get()), "cf0_key->cf0_val_new");
}
}
TEST_F(CoalescingIteratorTest, EmptyCfs) {
Options options = GetDefaultOptions();
{

View File

@ -0,0 +1 @@
Provide consistent view of the database across the column families for `NewIterators()` API.