Break TableReader MultiGet into filter and lookup stages (#10432)

Summary:
This PR is the first step in enhancing the coroutines MultiGet to be able to lookup a batch in parallel across levels. By having a separate TableReader function for probing the bloom filters, we can quickly figure out which overlapping keys from a batch are definitely not in the file and can move on to the next level.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10432

Reviewed By: akankshamahajan15

Differential Revision: D38245910

Pulled By: anand1976

fbshipit-source-id: 3d20db2350378c3fe6f086f0c7ba5ff01d7f04de
This commit is contained in:
anand76 2022-08-04 12:51:57 -07:00 committed by Facebook GitHub Bot
parent 538df26fcc
commit bf4532eb5c
10 changed files with 175 additions and 20 deletions

View file

@ -2338,6 +2338,34 @@ TEST_F(DBMultiGetAsyncIOTest, GetFromL1AndL2) {
ASSERT_EQ(multiget_io_batch_size.count, 1);
ASSERT_EQ(multiget_io_batch_size.max, 2);
}
TEST_F(DBMultiGetAsyncIOTest, GetFromL2WithRangeOverlapL0L1) {
std::vector<std::string> key_strs;
std::vector<Slice> keys;
std::vector<PinnableSlice> values;
std::vector<Status> statuses;
// 19 and 26 are in L2, but overlap with L0 and L1 file ranges
key_strs.push_back(Key(19));
key_strs.push_back(Key(26));
keys.push_back(key_strs[0]);
keys.push_back(key_strs[1]);
values.resize(keys.size());
statuses.resize(keys.size());
ReadOptions ro;
ro.async_io = true;
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
keys.data(), values.data(), statuses.data());
ASSERT_EQ(values.size(), 2);
ASSERT_EQ(statuses[0], Status::OK());
ASSERT_EQ(statuses[1], Status::OK());
ASSERT_EQ(values[0], "val_l2_" + std::to_string(19));
ASSERT_EQ(values[1], "val_l2_" + std::to_string(26));
// Bloom filters in L0/L1 will avoid the coroutine calls in those levels
ASSERT_EQ(statistics()->getTickerCount(MULTIGET_COROUTINE_COUNT), 2);
}
#endif // USE_COROUTINES
TEST_F(DBBasicTest, MultiGetStats) {

View file

@ -501,6 +501,48 @@ Status TableCache::Get(
return s;
}
Status TableCache::MultiGetFilter(
const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
HistogramImpl* file_read_hist, int level,
MultiGetContext::Range* mget_range, Cache::Handle** table_handle) {
auto& fd = file_meta.fd;
#ifndef ROCKSDB_LITE
IterKey row_cache_key;
std::string row_cache_entry_buffer;
// Check if we need to use the row cache. If yes, then we cannot do the
// filtering here, since the filtering needs to happen after the row cache
// lookup.
KeyContext& first_key = *mget_range->begin();
if (ioptions_.row_cache && !first_key.get_context->NeedToReadSequence()) {
return Status::NotSupported();
}
#endif // ROCKSDB_LITE
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (t == nullptr) {
s = FindTable(
options, file_options_, internal_comparator, fd, &handle,
prefix_extractor, options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, /*skip_filters=*/false,
level, true /* prefetch_index_and_filter_in_cache */,
/*max_file_size_for_l0_meta_pin=*/0, file_meta.temperature);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
*table_handle = handle;
}
if (s.ok()) {
s = t->MultiGetFilter(options, prefix_extractor.get(), mget_range);
}
return s;
}
Status TableCache::GetTableProperties(
const FileOptions& file_options,
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,

View file

@ -107,6 +107,19 @@ class TableCache {
const FileMetaData& file_meta,
std::unique_ptr<FragmentedRangeTombstoneIterator>* out_iter);
// Call table reader's MultiGetFilter to use the bloom filter to filter out
// keys. Returns Status::NotSupported() if row cache needs to be checked.
// If the table cache is looked up to get the table reader, the cache handle
// is returned in table_handle. This handle should be passed back to
// MultiGet() so it can be released.
Status MultiGetFilter(
const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
HistogramImpl* file_read_hist, int level,
MultiGetContext::Range* mget_range, Cache::Handle** table_handle);
// If a seek to internal key "k" in specified file finds an entry,
// call get_context->SaveValue() repeatedly until
// it returns false. As a side effect, it will insert the TableReader
@ -122,7 +135,7 @@ class TableCache {
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
HistogramImpl* file_read_hist = nullptr, bool skip_filters = false,
int level = -1);
int level = -1, Cache::Handle* table_handle = nullptr);
// Evict any entry for the specified file number
static void Evict(Cache* cache, uint64_t file_number);

View file

@ -17,13 +17,17 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
(const ReadOptions& options, const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters, int level) {
HistogramImpl* file_read_hist, bool skip_filters, int level,
Cache::Handle* table_handle) {
auto& fd = file_meta.fd;
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
Cache::Handle* handle = table_handle;
MultiGetRange table_range(*mget_range, mget_range->begin(),
mget_range->end());
if (handle != nullptr && t == nullptr) {
t = GetTableReaderFromHandle(handle);
}
#ifndef ROCKSDB_LITE
autovector<std::string, MultiGetContext::MAX_BATCH_SIZE> row_cache_entries;
IterKey row_cache_key;
@ -61,6 +65,7 @@ DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
// found in the row cache and thus the range may now be empty
if (s.ok() && !table_range.empty()) {
if (t == nullptr) {
assert(handle == nullptr);
s = FindTable(options, file_options_, internal_comparator, fd, &handle,
prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,

View file

@ -2215,11 +2215,13 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
if (!read_options.async_io || !using_coroutines() ||
fp.GetHitFileLevel() == 0 || !fp.RemainingOverlapInLevel()) {
if (f) {
bool skip_filters = IsFilterSkipped(
static_cast<int>(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel());
// Call MultiGetFromSST for looking up a single file
s = MultiGetFromSST(read_options, fp.CurrentFileRange(),
fp.GetHitFileLevel(), fp.IsHitFileLastInLevel(), f,
blob_ctxs, num_filter_read, num_index_read,
num_sst_read);
fp.GetHitFileLevel(), skip_filters, f, blob_ctxs,
/*table_handle=*/nullptr, num_filter_read,
num_index_read, num_sst_read);
if (fp.GetHitFileLevel() == 0) {
dump_stats_for_l0_file = true;
}
@ -2231,16 +2233,39 @@ void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
} else {
std::vector<folly::coro::Task<Status>> mget_tasks;
while (f != nullptr) {
MultiGetRange file_range = fp.CurrentFileRange();
Cache::Handle* table_handle = nullptr;
bool skip_filters = IsFilterSkipped(
static_cast<int>(fp.GetHitFileLevel()), fp.IsHitFileLastInLevel());
if (!skip_filters) {
Status status = table_cache_->MultiGetFilter(
read_options, *internal_comparator(), *f->file_metadata,
mutable_cf_options_.prefix_extractor,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
fp.GetHitFileLevel(), &file_range, &table_handle);
if (status.ok()) {
skip_filters = true;
} else if (!status.IsNotSupported()) {
s = status;
}
}
if (!s.ok()) {
break;
}
if (!file_range.empty()) {
mget_tasks.emplace_back(MultiGetFromSSTCoroutine(
read_options, fp.CurrentFileRange(), fp.GetHitFileLevel(),
fp.IsHitFileLastInLevel(), f, blob_ctxs, num_filter_read,
num_index_read, num_sst_read));
read_options, file_range, fp.GetHitFileLevel(), skip_filters, f,
blob_ctxs, table_handle, num_filter_read, num_index_read,
num_sst_read));
}
if (fp.KeyMaySpanNextFile()) {
break;
}
f = fp.GetNextFileInLevel();
}
if (mget_tasks.size() > 0) {
if (s.ok() && mget_tasks.size() > 0) {
RecordTick(db_statistics_, MULTIGET_COROUTINE_COUNT, mget_tasks.size());
// Collect all results so far
std::vector<Status> statuses = folly::coro::blockingWait(

View file

@ -990,10 +990,10 @@ class Version {
DECLARE_SYNC_AND_ASYNC(
/* ret_type */ Status, /* func_name */ MultiGetFromSST,
const ReadOptions& read_options, MultiGetRange file_range,
int hit_file_level, bool is_hit_file_last_in_level, FdWithKeyRange* f,
int hit_file_level, bool skip_filters, FdWithKeyRange* f,
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
uint64_t& num_filter_read, uint64_t& num_index_read,
uint64_t& num_sst_read);
Cache::Handle* table_handle, uint64_t& num_filter_read,
uint64_t& num_index_read, uint64_t& num_sst_read);
ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs
Logger* info_log_;

View file

@ -13,9 +13,10 @@ namespace ROCKSDB_NAMESPACE {
// Lookup a batch of keys in a single SST file
DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
(const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level,
bool is_hit_file_last_in_level, FdWithKeyRange* f,
bool skip_filters, FdWithKeyRange* f,
std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
uint64_t& num_filter_read, uint64_t& num_index_read, uint64_t& num_sst_read) {
Cache::Handle* table_handle, uint64_t& num_filter_read,
uint64_t& num_index_read, uint64_t& num_sst_read) {
bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
@ -24,10 +25,8 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
s = CO_AWAIT(table_cache_->MultiGet)(
read_options, *internal_comparator(), *f->file_metadata, &file_range,
mutable_cf_options_.prefix_extractor,
cfd_->internal_stats()->GetFileReadHist(hit_file_level),
IsFilterSkipped(static_cast<int>(hit_file_level),
is_hit_file_last_in_level),
hit_file_level);
cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters,
hit_file_level, table_handle);
// TODO: examine the behavior for corrupted key
if (timer_enabled) {
PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),

View file

@ -2273,6 +2273,36 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
return s;
}
Status BlockBasedTable::MultiGetFilter(const ReadOptions& read_options,
const SliceTransform* prefix_extractor,
MultiGetRange* mget_range) {
if (mget_range->empty()) {
// Caller should ensure non-empty (performance bug)
assert(false);
return Status::OK(); // Nothing to do
}
FilterBlockReader* const filter = rep_->filter.get();
if (!filter) {
return Status::OK();
}
// First check the full filter
// If full filter not useful, Then go into each block
const bool no_io = read_options.read_tier == kBlockCacheTier;
uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
if (mget_range->begin()->get_context) {
tracing_mget_id = mget_range->begin()->get_context->get_tracing_get_id();
}
BlockCacheLookupContext lookup_context{
TableReaderCaller::kUserMultiGet, tracing_mget_id,
/*_get_from_user_specified_snapshot=*/read_options.snapshot != nullptr};
FullFilterKeysMayMatch(filter, mget_range, no_io, prefix_extractor,
&lookup_context, read_options.rate_limiter_priority);
return Status::OK();
}
Status BlockBasedTable::Prefetch(const Slice* const begin,
const Slice* const end) {
auto& comparator = rep_->internal_comparator;

View file

@ -141,6 +141,10 @@ class BlockBasedTable : public TableReader {
GetContext* get_context, const SliceTransform* prefix_extractor,
bool skip_filters = false) override;
Status MultiGetFilter(const ReadOptions& read_options,
const SliceTransform* prefix_extractor,
MultiGetRange* mget_range) override;
DECLARE_SYNC_AND_ASYNC_OVERRIDE(void, MultiGet,
const ReadOptions& readOptions,
const MultiGetContext::Range* mget_range,

View file

@ -128,6 +128,15 @@ class TableReader {
const SliceTransform* prefix_extractor,
bool skip_filters = false) = 0;
// Use bloom filters in the table file, if present, to filter out keys. The
// mget_range will be updated to skip keys that get a negative result from
// the filter lookup.
virtual Status MultiGetFilter(const ReadOptions& /*readOptions*/,
const SliceTransform* /*prefix_extractor*/,
MultiGetContext::Range* /*mget_range*/) {
return Status::NotSupported();
}
virtual void MultiGet(const ReadOptions& readOptions,
const MultiGetContext::Range* mget_range,
const SliceTransform* prefix_extractor,