diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 93342d3642..bfbbf1d693 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2517,12 +2517,12 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key, return s; } -template -Status DBImpl::MultiCFSnapshot( - const ReadOptions& read_options, ReadCallback* callback, - std::function& - iter_deref_func, - T* cf_list, SequenceNumber* snapshot, bool* sv_from_thread_local) { +template +Status DBImpl::MultiCFSnapshot(const ReadOptions& read_options, + ReadCallback* callback, + IterDerefFuncType iter_deref_func, T* cf_list, + SequenceNumber* snapshot, + bool* sv_from_thread_local) { PERF_TIMER_GUARD(get_snapshot_time); assert(sv_from_thread_local); @@ -2770,37 +2770,36 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options, } PrepareMultiGetKeys(num_keys, sorted_input, &sorted_keys); - autovector - multiget_cf_data; + autovector + key_range_per_cf; + autovector + cfd_sv_pairs; size_t cf_start = 0; ColumnFamilyHandle* cf = sorted_keys[0]->column_family; for (size_t i = 0; i < num_keys; ++i) { KeyContext* key_ctx = sorted_keys[i]; if (key_ctx->column_family != cf) { - multiget_cf_data.emplace_back(cf, cf_start, i - cf_start, nullptr); + key_range_per_cf.emplace_back(cf_start, i - cf_start); + cfd_sv_pairs.emplace_back(cf, nullptr); cf_start = i; cf = key_ctx->column_family; } } - multiget_cf_data.emplace_back(cf, cf_start, num_keys - cf_start, nullptr); + key_range_per_cf.emplace_back(cf_start, num_keys - cf_start); + cfd_sv_pairs.emplace_back(cf, nullptr); - std::function::iterator&)> - iter_deref_lambda = - [](autovector::iterator& cf_iter) { - return &(*cf_iter); - }; - - SequenceNumber consistent_seqnum; + SequenceNumber consistent_seqnum = kMaxSequenceNumber; bool sv_from_thread_local; - Status s = MultiCFSnapshot< - autovector>( - read_options, nullptr, iter_deref_lambda, &multiget_cf_data, - &consistent_seqnum, &sv_from_thread_local); + Status s = MultiCFSnapshot>( + read_options, nullptr, + [](autovector::iterator& cf_iter) { + return &(*cf_iter); + }, + &cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local); if (!s.ok()) { for (size_t i = 0; i < num_keys; ++i) { @@ -2818,31 +2817,40 @@ void DBImpl::MultiGetCommon(const ReadOptions& read_options, read_callback = ×tamp_read_callback; } - auto cf_iter = multiget_cf_data.begin(); - for (; cf_iter != multiget_cf_data.end(); ++cf_iter) { - s = MultiGetImpl(read_options, cf_iter->start, cf_iter->num_keys, - &sorted_keys, cf_iter->super_version, consistent_seqnum, + assert(key_range_per_cf.size() == cfd_sv_pairs.size()); + auto key_range_per_cf_iter = key_range_per_cf.begin(); + auto cfd_sv_pair_iter = cfd_sv_pairs.begin(); + while (key_range_per_cf_iter != key_range_per_cf.end() && + cfd_sv_pair_iter != cfd_sv_pairs.end()) { + s = MultiGetImpl(read_options, key_range_per_cf_iter->start, + key_range_per_cf_iter->num_keys, &sorted_keys, + cfd_sv_pair_iter->super_version, consistent_seqnum, read_callback); if (!s.ok()) { break; } + ++key_range_per_cf_iter; + ++cfd_sv_pair_iter; } if (!s.ok()) { assert(s.IsTimedOut() || s.IsAborted()); - for (++cf_iter; cf_iter != multiget_cf_data.end(); ++cf_iter) { - for (size_t i = cf_iter->start; i < cf_iter->start + cf_iter->num_keys; + for (++key_range_per_cf_iter; + key_range_per_cf_iter != key_range_per_cf.end(); + ++key_range_per_cf_iter) { + for (size_t i = key_range_per_cf_iter->start; + i < key_range_per_cf_iter->start + key_range_per_cf_iter->num_keys; ++i) { *sorted_keys[i]->s = s; } } } - for (const auto& iter : multiget_cf_data) { + for (const auto& cfd_sv_pair : cfd_sv_pairs) { if (sv_from_thread_local) { - ReturnAndCleanupSuperVersion(iter.cfd, iter.super_version); + ReturnAndCleanupSuperVersion(cfd_sv_pair.cfd, cfd_sv_pair.super_version); } else { TEST_SYNC_POINT("DBImpl::MultiGet::BeforeLastTryUnRefSV"); - CleanupSuperVersion(iter.super_version); + CleanupSuperVersion(cfd_sv_pair.super_version); } } } @@ -2974,21 +2982,17 @@ void DBImpl::MultiGetWithCallbackImpl( const ReadOptions& read_options, ColumnFamilyHandle* column_family, ReadCallback* callback, autovector* sorted_keys) { - std::array multiget_cf_data; - multiget_cf_data[0] = MultiGetColumnFamilyData(column_family, nullptr); - std::function::iterator&)> - iter_deref_lambda = - [](std::array::iterator& cf_iter) { - return &(*cf_iter); - }; - + std::array cfd_sv_pairs; + cfd_sv_pairs[0] = ColumnFamilyDataSuperVersionPair(column_family, nullptr); size_t num_keys = sorted_keys->size(); - SequenceNumber consistent_seqnum; + SequenceNumber consistent_seqnum = kMaxSequenceNumber; bool sv_from_thread_local; - Status s = MultiCFSnapshot>( - read_options, callback, iter_deref_lambda, &multiget_cf_data, - &consistent_seqnum, &sv_from_thread_local); + Status s = MultiCFSnapshot>( + read_options, callback, + [](std::array::iterator& cf_iter) { + return &(*cf_iter); + }, + &cfd_sv_pairs, &consistent_seqnum, &sv_from_thread_local); if (!s.ok()) { return; } @@ -3027,11 +3031,11 @@ void DBImpl::MultiGetWithCallbackImpl( } s = MultiGetImpl(read_options, 0, num_keys, sorted_keys, - multiget_cf_data[0].super_version, consistent_seqnum, + cfd_sv_pairs[0].super_version, consistent_seqnum, read_callback); assert(s.ok() || s.IsTimedOut() || s.IsAborted()); - ReturnAndCleanupSuperVersion(multiget_cf_data[0].cfd, - multiget_cf_data[0].super_version); + ReturnAndCleanupSuperVersion(cfd_sv_pairs[0].cfd, + cfd_sv_pairs[0].super_version); } // The actual implementation of batched MultiGet. Parameters - diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e0ac43ddb9..9803a39afc 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2337,10 +2337,7 @@ class DBImpl : public DB { // A structure to hold the information required to process MultiGet of keys // belonging to one column family. For a multi column family MultiGet, there // will be a container of these objects. - struct MultiGetColumnFamilyData { - ColumnFamilyHandle* cf; - ColumnFamilyData* cfd; - + struct MultiGetKeyRangePerCf { // For the batched MultiGet which relies on sorted keys, start specifies // the index of first key belonging to this column family in the sorted // list. @@ -2350,31 +2347,31 @@ class DBImpl : public DB { // belonging to this column family in the sorted list size_t num_keys; + MultiGetKeyRangePerCf() : start(0), num_keys(0) {} + + MultiGetKeyRangePerCf(size_t first, size_t count) + : start(first), num_keys(count) {} + }; + + // A structure to contain ColumnFamilyData and the SuperVersion obtained for + // the consistent view of DB + struct ColumnFamilyDataSuperVersionPair { + ColumnFamilyData* cfd; + // SuperVersion for the column family obtained in a manner that ensures a // consistent view across all column families in the DB SuperVersion* super_version; - MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, - SuperVersion* sv) - : cf(column_family), - cfd(static_cast(cf)->cfd()), - start(0), - num_keys(0), + ColumnFamilyDataSuperVersionPair(ColumnFamilyHandle* column_family, + SuperVersion* sv) + : cfd(static_cast(column_family)->cfd()), super_version(sv) {} - MultiGetColumnFamilyData(ColumnFamilyHandle* column_family, size_t first, - size_t count, SuperVersion* sv) - : cf(column_family), - cfd(static_cast(cf)->cfd()), - start(first), - num_keys(count), - super_version(sv) {} - - MultiGetColumnFamilyData() = default; + ColumnFamilyDataSuperVersionPair() = default; }; // A common function to obtain a consistent snapshot, which can be implicit // if the user doesn't specify a snapshot in read_options, across - // multiple column families for MultiGet. It will attempt to get an implicit + // multiple column families. It will attempt to get an implicit // snapshot without acquiring the db_mutes, but will give up after a few // tries and acquire the mutex if a memtable flush happens. The template // allows both the batched and non-batched MultiGet to call this with @@ -2389,12 +2386,11 @@ class DBImpl : public DB { // A non-OK status will be returned if for a column family that enables // user-defined timestamp feature, the specified `ReadOptions.timestamp` // attemps to read collapsed history. - template - Status MultiCFSnapshot( - const ReadOptions& read_options, ReadCallback* callback, - std::function& - iter_deref_func, - T* cf_list, SequenceNumber* snapshot, bool* sv_from_thread_local); + template + Status MultiCFSnapshot(const ReadOptions& read_options, + ReadCallback* callback, + IterDerefFuncType iter_deref_func, T* cf_list, + SequenceNumber* snapshot, bool* sv_from_thread_local); // The actual implementation of the batching MultiGet. The caller is expected // to have acquired the SuperVersion and pass in a snapshot sequence number