mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 22:41:48 +00:00
bf98dcf9a8
Summary: The original goal is to propagate failures from `GetContext::SaveValue()` -> `GetContext::GetBlobValue()` -> `BlobFetcher::FetchBlob()` up to the user. This call sequence happens when a merge chain ends with a base value in a blob file. There's also fixes for bugs encountered along the way where non-ok statuses were ignored/overwritten, and a bit of plumbing work for functions that had no capability to return a status. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12462 Test Plan: A repro command ``` db=/dev/shm/dbstress_db ; exp=/dev/shm/dbstress_exp ; rm -rf $db $exp ; mkdir -p $db $exp ./db_stress \ --clear_column_family_one_in=0 \ --test_batches_snapshots=0 \ --write_fault_one_in=0 \ --use_put_entity_one_in=0 \ --prefixpercent=0 \ --read_fault_one_in=0 \ --readpercent=0 \ --reopen=0 \ --set_options_one_in=10000 \ --delpercent=0 \ --delrangepercent=0 \ --open_metadata_write_fault_one_in=0 \ --open_read_fault_one_in=0 \ --open_write_fault_one_in=0 \ --destroy_db_initially=0 \ --ingest_external_file_one_in=0 \ --iterpercent=0 \ --nooverwritepercent=0 \ --db=$db \ --enable_blob_files=1 \ --expected_values_dir=$exp \ --max_background_compactions=20 \ --max_bytes_for_level_base=2097152 \ --max_key=100000 \ --min_blob_size=0 \ --open_files=-1 \ --ops_per_thread=100000000 \ --prefix_size=-1 \ --target_file_size_base=524288 \ --use_merge=1 \ --value_size_mult=32 \ --write_buffer_size=524288 \ --writepercent=100 ``` It used to fail like: ``` ... frame https://github.com/facebook/rocksdb/issues/9: 0x00007fc63903bc93 libc.so.6`__GI___assert_fail(assertion="HasDefaultColumn(columns)", file="fbcode/internal_repo_rocksdb/repo/db/wide/wide_columns_helper.h", line=33, function="static const rocksdb::Slice &rocksdb::WideColumnsHelper::GetDefaultColumn(const rocksdb::WideColumns &)") at assert.c:101:3 frame https://github.com/facebook/rocksdb/issues/10: 0x00000000006f7e92 db_stress`rocksdb::Version::Get(rocksdb::ReadOptions const&, rocksdb::LookupKey const&, rocksdb::PinnableSlice*, rocksdb::PinnableWideColumns*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>*, rocksdb::Status*, rocksdb::MergeContext*, unsigned long*, rocksdb::PinnedIteratorsManager*, bool*, bool*, unsigned long*, rocksdb::ReadCallback*, bool*, bool) [inlined] rocksdb::WideColumnsHelper::GetDefaultColumn(columns=size=0) at wide_columns_helper.h:33 frame https://github.com/facebook/rocksdb/issues/11: 0x00000000006f7e76 db_stress`rocksdb::Version::Get(this=0x00007fc5ec763000, read_options=<unavailable>, k=<unavailable>, value=0x0000000000000000, columns=0x00007fc6035fd1d8, timestamp=<unavailable>, status=0x00007fc6035fd250, merge_context=0x00007fc6035fce40, max_covering_tombstone_seq=0x00007fc6035fce90, pinned_iters_mgr=0x00007fc6035fcdf0, value_found=0x0000000000000000, key_exists=0x0000000000000000, seq=0x0000000000000000, callback=0x0000000000000000, is_blob=0x0000000000000000, do_merge=<unavailable>) at version_set.cc:2492 frame https://github.com/facebook/rocksdb/issues/12: 0x000000000051e245 db_stress`rocksdb::DBImpl::GetImpl(this=0x00007fc637a86000, read_options=0x00007fc6035fcf60, key=<unavailable>, get_impl_options=0x00007fc6035fd000) at db_impl.cc:2408 frame https://github.com/facebook/rocksdb/issues/13: 0x000000000050cec2 db_stress`rocksdb::DBImpl::GetEntity(this=0x00007fc637a86000, _read_options=<unavailable>, column_family=<unavailable>, key=0x00007fc6035fd3c8, columns=0x00007fc6035fd1d8) at db_impl.cc:2109 frame https://github.com/facebook/rocksdb/issues/14: 0x000000000074f688 db_stress`rocksdb::(anonymous namespace)::MemTableInserter::MergeCF(this=0x00007fc6035fd450, column_family_id=2, key=0x00007fc6035fd3c8, value=0x00007fc6035fd3a0) at write_batch.cc:2656 frame https://github.com/facebook/rocksdb/issues/15: 0x00000000007476fc db_stress`rocksdb::WriteBatchInternal::Iterate(wb=0x00007fc6035fe698, handler=0x00007fc6035fd450, begin=12, end=<unavailable>) at write_batch.cc:607 frame https://github.com/facebook/rocksdb/issues/16: 0x000000000074d7dd db_stress`rocksdb::WriteBatchInternal::InsertInto(rocksdb::WriteThread::WriteGroup&, unsigned long, rocksdb::ColumnFamilyMemTables*, rocksdb::FlushScheduler*, rocksdb::TrimHistoryScheduler*, bool, unsigned long, rocksdb::DB*, bool, bool, bool) [inlined] rocksdb::WriteBatch::Iterate(this=<unavailable>, handler=0x00007fc6035fd450) const at write_batch.cc:505 frame https://github.com/facebook/rocksdb/issues/17: 0x000000000074d77b db_stress`rocksdb::WriteBatchInternal::InsertInto(write_group=<unavailable>, sequence=<unavailable>, memtables=<unavailable>, flush_scheduler=<unavailable>, trim_history_scheduler=<unavailable>, ignore_missing_column_families=<unavailable>, recovery_log_number=0, db=0x00007fc637a86000, concurrent_memtable_writes=<unavailable>, seq_per_batch=false, batch_per_txn=<unavailable>) at write_batch.cc:3084 frame https://github.com/facebook/rocksdb/issues/18: 0x0000000000631d77 db_stress`rocksdb::DBImpl::PipelinedWriteImpl(this=0x00007fc637a86000, write_options=<unavailable>, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=<unavailable>, log_ref=0, disable_memtable=<unavailable>, seq_used=0x0000000000000000) at db_impl_write.cc:807 frame https://github.com/facebook/rocksdb/issues/19: 0x000000000062ceeb db_stress`rocksdb::DBImpl::WriteImpl(this=<unavailable>, write_options=<unavailable>, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=<unavailable>, log_ref=0, disable_memtable=<unavailable>, seq_used=0x0000000000000000, batch_cnt=0, pre_release_callback=0x0000000000000000, post_memtable_callback=0x0000000000000000) at db_impl_write.cc:312 frame https://github.com/facebook/rocksdb/issues/20: 0x000000000062c8ec db_stress`rocksdb::DBImpl::Write(this=0x00007fc637a86000, write_options=0x00007fc6035feca8, my_batch=0x00007fc6035fe698) at db_impl_write.cc:157 frame https://github.com/facebook/rocksdb/issues/21: 0x000000000062b847 db_stress`rocksdb::DB::Merge(this=0x00007fc637a86000, opt=0x00007fc6035feca8, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, value=0x00007fc6035fe830) at db_impl_write.cc:2544 frame https://github.com/facebook/rocksdb/issues/22: 0x000000000062b6ef db_stress`rocksdb::DBImpl::Merge(this=0x00007fc637a86000, o=<unavailable>, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, val=0x00007fc6035fe830) at db_impl_write.cc:72 frame https://github.com/facebook/rocksdb/issues/23: 0x00000000004d6397 db_stress`rocksdb::NonBatchedOpsStressTest::TestPut(this=0x00007fc637041000, thread=0x00007fc6370dbc00, write_opts=0x00007fc6035feca8, read_opts=0x00007fc6035fe9c8, rand_column_families=<unavailable>, rand_keys=size=1, value={P\xe9_\x03\xc6\x7f\0\0}) at no_batched_ops_stress.cc:1317 frame https://github.com/facebook/rocksdb/issues/24: 0x000000000049361d db_stress`rocksdb::StressTest::OperateDb(this=0x00007fc637041000, thread=0x00007fc6370dbc00) at db_stress_test_base.cc:1148 ... ``` Reviewed By: ltamasi Differential Revision: D55157795 Pulled By: ajkr fbshipit-source-id: 5f7c1380ead5794c29d41680028e34b839744764
136 lines
5 KiB
C++
136 lines
5 KiB
C++
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
//
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "util/coro_utils.h"
|
|
|
|
#if defined(WITHOUT_COROUTINES) || \
|
|
(defined(USE_COROUTINES) && defined(WITH_COROUTINES))
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
#if defined(WITHOUT_COROUTINES)
|
|
#endif
|
|
|
|
// Batched version of TableCache::MultiGet.
|
|
DEFINE_SYNC_AND_ASYNC(Status, TableCache::MultiGet)
|
|
(const ReadOptions& options, const InternalKeyComparator& internal_comparator,
|
|
const FileMetaData& file_meta, const MultiGetContext::Range* mget_range,
|
|
uint8_t block_protection_bytes_per_key,
|
|
const std::shared_ptr<const SliceTransform>& prefix_extractor,
|
|
HistogramImpl* file_read_hist, bool skip_filters, bool skip_range_deletions,
|
|
int level, TypedHandle* handle) {
|
|
auto& fd = file_meta.fd;
|
|
Status s;
|
|
TableReader* t = fd.table_reader;
|
|
MultiGetRange table_range(*mget_range, mget_range->begin(),
|
|
mget_range->end());
|
|
if (handle != nullptr && t == nullptr) {
|
|
t = cache_.Value(handle);
|
|
}
|
|
autovector<std::string, MultiGetContext::MAX_BATCH_SIZE> row_cache_entries;
|
|
IterKey row_cache_key;
|
|
size_t row_cache_key_prefix_size = 0;
|
|
KeyContext& first_key = *table_range.begin();
|
|
bool lookup_row_cache =
|
|
ioptions_.row_cache && !first_key.get_context->NeedToReadSequence();
|
|
|
|
// Check row cache if enabled. Since row cache does not currently store
|
|
// sequence numbers, we cannot use it if we need to fetch the sequence.
|
|
if (lookup_row_cache) {
|
|
GetContext* first_context = first_key.get_context;
|
|
CreateRowCacheKeyPrefix(options, fd, first_key.ikey, first_context,
|
|
row_cache_key);
|
|
row_cache_key_prefix_size = row_cache_key.Size();
|
|
|
|
for (auto miter = table_range.begin(); miter != table_range.end();
|
|
++miter) {
|
|
const Slice& user_key = miter->ukey_with_ts;
|
|
|
|
GetContext* get_context = miter->get_context;
|
|
|
|
Status read_status;
|
|
bool ret =
|
|
GetFromRowCache(user_key, row_cache_key, row_cache_key_prefix_size,
|
|
get_context, &read_status);
|
|
if (!read_status.ok()) {
|
|
CO_RETURN read_status;
|
|
}
|
|
if (ret) {
|
|
table_range.SkipKey(miter);
|
|
} else {
|
|
row_cache_entries.emplace_back();
|
|
get_context->SetReplayLog(&(row_cache_entries.back()));
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check that table_range is not empty. Its possible all keys may have been
|
|
// found in the row cache and thus the range may now be empty
|
|
if (s.ok() && !table_range.empty()) {
|
|
if (t == nullptr) {
|
|
assert(handle == nullptr);
|
|
s = FindTable(options, file_options_, internal_comparator, file_meta,
|
|
&handle, block_protection_bytes_per_key, prefix_extractor,
|
|
options.read_tier == kBlockCacheTier /* no_io */,
|
|
file_read_hist, skip_filters, level,
|
|
true /* prefetch_index_and_filter_in_cache */,
|
|
0 /*max_file_size_for_l0_meta_pin*/, file_meta.temperature);
|
|
TEST_SYNC_POINT_CALLBACK("TableCache::MultiGet:FindTable", &s);
|
|
if (s.ok()) {
|
|
t = cache_.Value(handle);
|
|
assert(t);
|
|
}
|
|
}
|
|
if (s.ok() && !options.ignore_range_deletions && !skip_range_deletions) {
|
|
UpdateRangeTombstoneSeqnums(options, t, table_range);
|
|
}
|
|
if (s.ok()) {
|
|
CO_AWAIT(t->MultiGet)
|
|
(options, &table_range, prefix_extractor.get(), skip_filters);
|
|
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
|
|
for (auto iter = table_range.begin(); iter != table_range.end(); ++iter) {
|
|
Status* status = iter->s;
|
|
if (status->IsIncomplete()) {
|
|
// Couldn't find Table in cache but treat as kFound if no_io set
|
|
iter->get_context->MarkKeyMayExist();
|
|
s = Status::OK();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (lookup_row_cache) {
|
|
size_t row_idx = 0;
|
|
RowCacheInterface row_cache{ioptions_.row_cache.get()};
|
|
|
|
for (auto miter = table_range.begin(); miter != table_range.end();
|
|
++miter) {
|
|
std::string& row_cache_entry = row_cache_entries[row_idx++];
|
|
const Slice& user_key = miter->ukey_with_ts;
|
|
GetContext* get_context = miter->get_context;
|
|
|
|
get_context->SetReplayLog(nullptr);
|
|
// Compute row cache key.
|
|
row_cache_key.TrimAppend(row_cache_key_prefix_size, user_key.data(),
|
|
user_key.size());
|
|
// Put the replay log in row cache only if something was found.
|
|
if (s.ok() && !row_cache_entry.empty()) {
|
|
size_t charge = row_cache_entry.capacity() + sizeof(std::string);
|
|
auto row_ptr = new std::string(std::move(row_cache_entry));
|
|
// If row cache is full, it's OK.
|
|
row_cache.Insert(row_cache_key.GetUserKey(), row_ptr, charge)
|
|
.PermitUncheckedError();
|
|
}
|
|
}
|
|
}
|
|
|
|
if (handle != nullptr) {
|
|
cache_.Release(handle);
|
|
}
|
|
CO_RETURN s;
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
#endif
|