MultiCFSnapshot Refactor - separate multiget key range info from CFD & superversion info (#12561)

Summary:
While implementing MultiCFIterators (CoalescingIterator and AttributeGroupIterator), we found that the existing `NewIterators()` API does not ensure a uniform view of the DB across all column families. The `NewIterators()` function is utilized to generate child iterators for the MultiCfIterators, and it's expected that all child iterators maintain a consistent view of the DB.

For example, within the loop where the super version for each CF is being obtained, if a CF undergoes compaction after the super versions for previous CFs have already been retrieved, we lose the consistency in the view of the CFs for the iterators due to the API not under a db mutex.

This preliminary refactoring of `MultiCFSnapshot` aims to address this issue in the `NewIterators()` API in the later PR. Currently, `MultiCFSnapshot` is used to achieve a consistent view across CFs in `MultiGet`. The `MultiGetColumnFamilyData` contains MultiGet-specific information that can be decoupled from the cfd and sv, allowing `MultiCFSnapshot` to be used in other places.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12561

Test Plan:
**Existing Unit Tests for `MultiCFSnapshot()`**

```
./db_basic_test -- --gtest_filter="*MultiGet*"
```

**Performance Test**

Setup
```
make -j64 release

TEST_TMPDIR=/dev/shm/db_bench ./db_bench -benchmarks="filluniquerandom" -key_size=32 -value_size=512 -num=10000000 -compression_type=none
```
Run
```
TEST_TMPDIR=/dev/shm/db_bench ./db_bench -use_existing_db=1 -benchmarks="multireadrandom" -cache_size=10485760000
```
Before the change
```
DB path: [/dev/shm/db_bench/dbbench]
multireadrandom :       4.760 micros/op 210072 ops/sec 4.760 seconds 1000000 operations; (0 of 1000000 found)
```

After the change
```
DB path: [/dev/shm/db_bench/dbbench]
multireadrandom :       4.593 micros/op 217727 ops/sec 4.593 seconds 1000000 operations; (0 of 1000000 found)
```

Reviewed By: anand1976

Differential Revision: D56309422

Pulled By: jaykorean

fbshipit-source-id: 7a9164d12c810b6c2d2db062827fcc4a36cbc77b
This commit is contained in:
Jay Huh 2024-04-18 20:11:01 -07:00 committed by Facebook GitHub Bot
parent 97991960e9
commit 909ff2c208
2 changed files with 75 additions and 75 deletions

View File

@ -2517,12 +2517,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
return s;
}
template <class T>
Status DBImpl::MultiCFSnapshot(
const ReadOptions& read_options, ReadCallback* callback,
std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
iter_deref_func,
T* cf_list, SequenceNumber* snapshot, bool* sv_from_thread_local) {
template <class T, typename IterDerefFuncType>
Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
SequenceNumber* snapshot,
bool* sv_from_thread_local) {
PERF_TIMER_GUARD(get_snapshot_time);
assert(sv_from_thread_local);
@ -2770,37 +2770,36 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
}
PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys);
autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>
multiget_cf_data;
autovector<MultiGetKeyRangePerCf, MultiGetContext::MAX_BATCH_SIZE>
key_range_per_cf;
autovector<ColumnFamilyDataSuperVersionPair, MultiGetContext::MAX_BATCH_SIZE>
cfd_sv_pairs;
size_t cf_start = 0;
ColumnFamilyHandle* cf = sorted_keys[0]->column_family;
for (size_t i = 0; i < num_keys; ++i) {
KeyContext* key_ctx = sorted_keys[i];
if (key_ctx->column_family != cf) {
multiget_cf_data.emplace_back(cf, cf_start, i - cf_start, nullptr);
key_range_per_cf.emplace_back(cf_start, i - cf_start);
cfd_sv_pairs.emplace_back(cf, nullptr);
cf_start = i;
cf = key_ctx->column_family;
}
}
multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr);
key_range_per_cf.emplace_back(cf_start, num_keys - cf_start);
cfd_sv_pairs.emplace_back(cf, nullptr);
std::function<MultiGetColumnFamilyData*(
autovector<MultiGetColumnFamilyData,
MultiGetContext::MAX_BATCH_SIZE>::iterator&)>
iter_deref_lambda =
[](autovector<MultiGetColumnFamilyData,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
};
SequenceNumber consistent_seqnum;
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local;
Status s = MultiCFSnapshot<
autovector<MultiGetColumnFamilyData, MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr, iter_deref_lambda, &multiget_cf_data,
&consistent_seqnum, &sv_from_thread_local);
Status s = MultiCFSnapshot<autovector<ColumnFamilyDataSuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>>(
read_options, nullptr,
[](autovector<ColumnFamilyDataSuperVersionPair,
MultiGetContext::MAX_BATCH_SIZE>::iterator& cf_iter) {
return &(*cf_iter);
},
&cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
for (size_t i = 0; i < num_keys; ++i) {
@ -2818,31 +2817,40 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options,
read_callback = &timestamp_read_callback;
}
auto cf_iter = multiget_cf_data.begin();
for (; cf_iter != multiget_cf_data.end(); ++cf_iter) {
s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys,
&sorted_keys, cf_iter->super_version, consistent_seqnum,
assert(key_range_per_cf.size() == cfd_sv_pairs.size());
auto key_range_per_cf_iter = key_range_per_cf.begin();
auto cfd_sv_pair_iter = cfd_sv_pairs.begin();
while (key_range_per_cf_iter != key_range_per_cf.end() &&
cfd_sv_pair_iter != cfd_sv_pairs.end()) {
s = MultiGetImpl(read_options, key_range_per_cf_iter->start,
key_range_per_cf_iter->num_keys, &sorted_keys,
cfd_sv_pair_iter->super_version, consistent_seqnum,
read_callback);
if (!s.ok()) {
break;
}
++key_range_per_cf_iter;
++cfd_sv_pair_iter;
}
if (!s.ok()) {
assert(s.IsTimedOut() || s.IsAborted());
for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) {
for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys;
for (++key_range_per_cf_iter;
key_range_per_cf_iter != key_range_per_cf.end();
++key_range_per_cf_iter) {
for (size_t i = key_range_per_cf_iter->start;
i < key_range_per_cf_iter->start + key_range_per_cf_iter->num_keys;
++i) {
*sorted_keys[i]->s = s;
}
}
}
for (const auto& iter : multiget_cf_data) {
for (const auto& cfd_sv_pair : cfd_sv_pairs) {
if (sv_from_thread_local) {
ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version);
ReturnAndCleanupSuperVersion(cfd_sv_pair.cfd, cfd_sv_pair.super_version);
} else {
TEST_SYNC_POINT("DBImpl::MultiGet::BeforeLastTryUnRefSV");
CleanupSuperVersion(iter.super_version);
CleanupSuperVersion(cfd_sv_pair.super_version);
}
}
}
@ -2974,21 +2982,17 @@ void DBImpl::MultiGetWithCallbackImpl(
const ReadOptions& read_options, ColumnFamilyHandle* column_family,
ReadCallback* callback,
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys) {
std::array<MultiGetColumnFamilyData, 1> multiget_cf_data;
multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr);
std::function<MultiGetColumnFamilyData*(
std::array<MultiGetColumnFamilyData, 1>::iterator&)>
iter_deref_lambda =
[](std::array<MultiGetColumnFamilyData, 1>::iterator& cf_iter) {
return &(*cf_iter);
};
std::array<ColumnFamilyDataSuperVersionPair, 1> cfd_sv_pairs;
cfd_sv_pairs[0] = ColumnFamilyDataSuperVersionPair(column_family, nullptr);
size_t num_keys = sorted_keys->size();
SequenceNumber consistent_seqnum;
SequenceNumber consistent_seqnum = kMaxSequenceNumber;
bool sv_from_thread_local;
Status s = MultiCFSnapshot<std::array<MultiGetColumnFamilyData, 1>>(
read_options, callback, iter_deref_lambda, &multiget_cf_data,
&consistent_seqnum, &sv_from_thread_local);
Status s = MultiCFSnapshot<std::array<ColumnFamilyDataSuperVersionPair, 1>>(
read_options, callback,
[](std::array<ColumnFamilyDataSuperVersionPair, 1>::iterator& cf_iter) {
return &(*cf_iter);
},
&cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local);
if (!s.ok()) {
return;
}
@ -3027,11 +3031,11 @@ void DBImpl::MultiGetWithCallbackImpl(
}
s = MultiGetImpl(read_options, 0, num_keys, sorted_keys,
multiget_cf_data[0].super_version, consistent_seqnum,
cfd_sv_pairs[0].super_version, consistent_seqnum,
read_callback);
assert(s.ok() || s.IsTimedOut() || s.IsAborted());
ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd,
multiget_cf_data[0].super_version);
ReturnAndCleanupSuperVersion(cfd_sv_pairs[0].cfd,
cfd_sv_pairs[0].super_version);
}
// The actual implementation of batched MultiGet. Parameters -

View File

@ -2337,10 +2337,7 @@ class DBImpl : public DB {
// A structure to hold the information required to process MultiGet of keys
// belonging to one column family. For a multi column family MultiGet, there
// will be a container of these objects.
struct MultiGetColumnFamilyData {
ColumnFamilyHandle* cf;
ColumnFamilyData* cfd;
struct MultiGetKeyRangePerCf {
// For the batched MultiGet which relies on sorted keys, start specifies
// the index of first key belonging to this column family in the sorted
// list.
@ -2350,31 +2347,31 @@ class DBImpl : public DB {
// belonging to this column family in the sorted list
size_t num_keys;
MultiGetKeyRangePerCf() : start(0), num_keys(0) {}
MultiGetKeyRangePerCf(size_t first, size_t count)
: start(first), num_keys(count) {}
};
// A structure to contain ColumnFamilyData and the SuperVersion obtained for
// the consistent view of DB
struct ColumnFamilyDataSuperVersionPair {
ColumnFamilyData* cfd;
// SuperVersion for the column family obtained in a manner that ensures a
// consistent view across all column families in the DB
SuperVersion* super_version;
MultiGetColumnFamilyData(ColumnFamilyHandle* column_family,
SuperVersion* sv)
: cf(column_family),
cfd(static_cast<ColumnFamilyHandleImpl*>(cf)->cfd()),
start(0),
num_keys(0),
ColumnFamilyDataSuperVersionPair(ColumnFamilyHandle* column_family,
SuperVersion* sv)
: cfd(static_cast<ColumnFamilyHandleImpl*>(column_family)->cfd()),
super_version(sv) {}
MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, size_t first,
size_t count, SuperVersion* sv)
: cf(column_family),
cfd(static_cast<ColumnFamilyHandleImpl*>(cf)->cfd()),
start(first),
num_keys(count),
super_version(sv) {}
MultiGetColumnFamilyData() = default;
ColumnFamilyDataSuperVersionPair() = default;
};
// A common function to obtain a consistent snapshot, which can be implicit
// if the user doesn't specify a snapshot in read_options, across
// multiple column families for MultiGet. It will attempt to get an implicit
// multiple column families. It will attempt to get an implicit
// snapshot without acquiring the db_mutes, but will give up after a few
// tries and acquire the mutex if a memtable flush happens. The template
// allows both the batched and non-batched MultiGet to call this with
@ -2389,12 +2386,11 @@ class DBImpl : public DB {
// A non-OK status will be returned if for a column family that enables
// user-defined timestamp feature, the specified `ReadOptions.timestamp`
// attemps to read collapsed history.
template <class T>
Status MultiCFSnapshot(
const ReadOptions& read_options, ReadCallback* callback,
std::function<MultiGetColumnFamilyData*(typename T::iterator&)>&
iter_deref_func,
T* cf_list, SequenceNumber* snapshot, bool* sv_from_thread_local);
template <class T, typename IterDerefFuncType>
Status MultiCFSnapshot(const ReadOptions& read_options,
ReadCallback* callback,
IterDerefFuncType iter_deref_func, T* cf_list,
SequenceNumber* snapshot, bool* sv_from_thread_local);
// The actual implementation of the batching MultiGet. The caller is expected
// to have acquired the SuperVersion and pass in a snapshot sequence number