mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-28 15:33:54 +00:00
bf98dcf9a8
Summary: The original goal is to propagate failures from `GetContext::SaveValue()` -> `GetContext::GetBlobValue()` -> `BlobFetcher::FetchBlob()` up to the user. This call sequence happens when a merge chain ends with a base value in a blob file. There's also fixes for bugs encountered along the way where non-ok statuses were ignored/overwritten, and a bit of plumbing work for functions that had no capability to return a status. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12462 Test Plan: A repro command ``` db=/dev/shm/dbstress_db ; exp=/dev/shm/dbstress_exp ; rm -rf $db $exp ; mkdir -p $db $exp ./db_stress \ --clear_column_family_one_in=0 \ --test_batches_snapshots=0 \ --write_fault_one_in=0 \ --use_put_entity_one_in=0 \ --prefixpercent=0 \ --read_fault_one_in=0 \ --readpercent=0 \ --reopen=0 \ --set_options_one_in=10000 \ --delpercent=0 \ --delrangepercent=0 \ --open_metadata_write_fault_one_in=0 \ --open_read_fault_one_in=0 \ --open_write_fault_one_in=0 \ --destroy_db_initially=0 \ --ingest_external_file_one_in=0 \ --iterpercent=0 \ --nooverwritepercent=0 \ --db=$db \ --enable_blob_files=1 \ --expected_values_dir=$exp \ --max_background_compactions=20 \ --max_bytes_for_level_base=2097152 \ --max_key=100000 \ --min_blob_size=0 \ --open_files=-1 \ --ops_per_thread=100000000 \ --prefix_size=-1 \ --target_file_size_base=524288 \ --use_merge=1 \ --value_size_mult=32 \ --write_buffer_size=524288 \ --writepercent=100 ``` It used to fail like: ``` ... frame https://github.com/facebook/rocksdb/issues/9: 0x00007fc63903bc93 libc.so.6`__GI___assert_fail(assertion="HasDefaultColumn(columns)", file="fbcode/internal_repo_rocksdb/repo/db/wide/wide_columns_helper.h", line=33, function="static const rocksdb::Slice &rocksdb::WideColumnsHelper::GetDefaultColumn(const rocksdb::WideColumns &)") at assert.c:101:3 frame https://github.com/facebook/rocksdb/issues/10: 0x00000000006f7e92 db_stress`rocksdb::Version::Get(rocksdb::ReadOptions const&, rocksdb::LookupKey const&, rocksdb::PinnableSlice*, rocksdb::PinnableWideColumns*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>*, rocksdb::Status*, rocksdb::MergeContext*, unsigned long*, rocksdb::PinnedIteratorsManager*, bool*, bool*, unsigned long*, rocksdb::ReadCallback*, bool*, bool) [inlined] rocksdb::WideColumnsHelper::GetDefaultColumn(columns=size=0) at wide_columns_helper.h:33 frame https://github.com/facebook/rocksdb/issues/11: 0x00000000006f7e76 db_stress`rocksdb::Version::Get(this=0x00007fc5ec763000, read_options=<unavailable>, k=<unavailable>, value=0x0000000000000000, columns=0x00007fc6035fd1d8, timestamp=<unavailable>, status=0x00007fc6035fd250, merge_context=0x00007fc6035fce40, max_covering_tombstone_seq=0x00007fc6035fce90, pinned_iters_mgr=0x00007fc6035fcdf0, value_found=0x0000000000000000, key_exists=0x0000000000000000, seq=0x0000000000000000, callback=0x0000000000000000, is_blob=0x0000000000000000, do_merge=<unavailable>) at version_set.cc:2492 frame https://github.com/facebook/rocksdb/issues/12: 0x000000000051e245 db_stress`rocksdb::DBImpl::GetImpl(this=0x00007fc637a86000, read_options=0x00007fc6035fcf60, key=<unavailable>, get_impl_options=0x00007fc6035fd000) at db_impl.cc:2408 frame https://github.com/facebook/rocksdb/issues/13: 0x000000000050cec2 db_stress`rocksdb::DBImpl::GetEntity(this=0x00007fc637a86000, _read_options=<unavailable>, column_family=<unavailable>, key=0x00007fc6035fd3c8, columns=0x00007fc6035fd1d8) at db_impl.cc:2109 frame https://github.com/facebook/rocksdb/issues/14: 0x000000000074f688 db_stress`rocksdb::(anonymous namespace)::MemTableInserter::MergeCF(this=0x00007fc6035fd450, column_family_id=2, key=0x00007fc6035fd3c8, value=0x00007fc6035fd3a0) at write_batch.cc:2656 frame https://github.com/facebook/rocksdb/issues/15: 0x00000000007476fc db_stress`rocksdb::WriteBatchInternal::Iterate(wb=0x00007fc6035fe698, handler=0x00007fc6035fd450, begin=12, end=<unavailable>) at write_batch.cc:607 frame https://github.com/facebook/rocksdb/issues/16: 0x000000000074d7dd db_stress`rocksdb::WriteBatchInternal::InsertInto(rocksdb::WriteThread::WriteGroup&, unsigned long, rocksdb::ColumnFamilyMemTables*, rocksdb::FlushScheduler*, rocksdb::TrimHistoryScheduler*, bool, unsigned long, rocksdb::DB*, bool, bool, bool) [inlined] rocksdb::WriteBatch::Iterate(this=<unavailable>, handler=0x00007fc6035fd450) const at write_batch.cc:505 frame https://github.com/facebook/rocksdb/issues/17: 0x000000000074d77b db_stress`rocksdb::WriteBatchInternal::InsertInto(write_group=<unavailable>, sequence=<unavailable>, memtables=<unavailable>, flush_scheduler=<unavailable>, trim_history_scheduler=<unavailable>, ignore_missing_column_families=<unavailable>, recovery_log_number=0, db=0x00007fc637a86000, concurrent_memtable_writes=<unavailable>, seq_per_batch=false, batch_per_txn=<unavailable>) at write_batch.cc:3084 frame https://github.com/facebook/rocksdb/issues/18: 0x0000000000631d77 db_stress`rocksdb::DBImpl::PipelinedWriteImpl(this=0x00007fc637a86000, write_options=<unavailable>, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=<unavailable>, log_ref=0, disable_memtable=<unavailable>, seq_used=0x0000000000000000) at db_impl_write.cc:807 frame https://github.com/facebook/rocksdb/issues/19: 0x000000000062ceeb db_stress`rocksdb::DBImpl::WriteImpl(this=<unavailable>, write_options=<unavailable>, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=<unavailable>, log_ref=0, disable_memtable=<unavailable>, seq_used=0x0000000000000000, batch_cnt=0, pre_release_callback=0x0000000000000000, post_memtable_callback=0x0000000000000000) at db_impl_write.cc:312 frame https://github.com/facebook/rocksdb/issues/20: 0x000000000062c8ec db_stress`rocksdb::DBImpl::Write(this=0x00007fc637a86000, write_options=0x00007fc6035feca8, my_batch=0x00007fc6035fe698) at db_impl_write.cc:157 frame https://github.com/facebook/rocksdb/issues/21: 0x000000000062b847 db_stress`rocksdb::DB::Merge(this=0x00007fc637a86000, opt=0x00007fc6035feca8, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, value=0x00007fc6035fe830) at db_impl_write.cc:2544 frame https://github.com/facebook/rocksdb/issues/22: 0x000000000062b6ef db_stress`rocksdb::DBImpl::Merge(this=0x00007fc637a86000, o=<unavailable>, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, val=0x00007fc6035fe830) at db_impl_write.cc:72 frame https://github.com/facebook/rocksdb/issues/23: 0x00000000004d6397 db_stress`rocksdb::NonBatchedOpsStressTest::TestPut(this=0x00007fc637041000, thread=0x00007fc6370dbc00, write_opts=0x00007fc6035feca8, read_opts=0x00007fc6035fe9c8, rand_column_families=<unavailable>, rand_keys=size=1, value={P\xe9_\x03\xc6\x7f\0\0}) at no_batched_ops_stress.cc:1317 frame https://github.com/facebook/rocksdb/issues/24: 0x000000000049361d db_stress`rocksdb::StressTest::OperateDb(this=0x00007fc637041000, thread=0x00007fc6370dbc00) at db_stress_test_base.cc:1148 ... ``` Reviewed By: ltamasi Differential Revision: D55157795 Pulled By: ajkr fbshipit-source-id: 5f7c1380ead5794c29d41680028e34b839744764
623 lines
23 KiB
C++
623 lines
23 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "table/get_context.h"
|
|
|
|
#include "db/blob//blob_fetcher.h"
|
|
#include "db/merge_helper.h"
|
|
#include "db/pinned_iterators_manager.h"
|
|
#include "db/read_callback.h"
|
|
#include "db/wide/wide_column_serialization.h"
|
|
#include "monitoring/file_read_sample.h"
|
|
#include "monitoring/perf_context_imp.h"
|
|
#include "monitoring/statistics_impl.h"
|
|
#include "rocksdb/merge_operator.h"
|
|
#include "rocksdb/statistics.h"
|
|
#include "rocksdb/system_clock.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
GetContext::GetContext(
|
|
const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
|
|
Statistics* statistics, GetState init_state, const Slice& user_key,
|
|
PinnableSlice* pinnable_val, PinnableWideColumns* columns,
|
|
std::string* timestamp, bool* value_found, MergeContext* merge_context,
|
|
bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
|
|
SystemClock* clock, SequenceNumber* seq,
|
|
PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
|
|
bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
|
|
: ucmp_(ucmp),
|
|
merge_operator_(merge_operator),
|
|
logger_(logger),
|
|
statistics_(statistics),
|
|
state_(init_state),
|
|
user_key_(user_key),
|
|
pinnable_val_(pinnable_val),
|
|
columns_(columns),
|
|
timestamp_(timestamp),
|
|
value_found_(value_found),
|
|
merge_context_(merge_context),
|
|
max_covering_tombstone_seq_(_max_covering_tombstone_seq),
|
|
clock_(clock),
|
|
seq_(seq),
|
|
replay_log_(nullptr),
|
|
pinned_iters_mgr_(_pinned_iters_mgr),
|
|
callback_(callback),
|
|
do_merge_(do_merge),
|
|
is_blob_index_(is_blob_index),
|
|
tracing_get_id_(tracing_get_id),
|
|
blob_fetcher_(blob_fetcher) {
|
|
if (seq_) {
|
|
*seq_ = kMaxSequenceNumber;
|
|
}
|
|
sample_ = should_sample_file_read();
|
|
}
|
|
|
|
GetContext::GetContext(const Comparator* ucmp,
|
|
const MergeOperator* merge_operator, Logger* logger,
|
|
Statistics* statistics, GetState init_state,
|
|
const Slice& user_key, PinnableSlice* pinnable_val,
|
|
PinnableWideColumns* columns, bool* value_found,
|
|
MergeContext* merge_context, bool do_merge,
|
|
SequenceNumber* _max_covering_tombstone_seq,
|
|
SystemClock* clock, SequenceNumber* seq,
|
|
PinnedIteratorsManager* _pinned_iters_mgr,
|
|
ReadCallback* callback, bool* is_blob_index,
|
|
uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
|
|
: GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
|
|
pinnable_val, columns, /*timestamp=*/nullptr, value_found,
|
|
merge_context, do_merge, _max_covering_tombstone_seq, clock,
|
|
seq, _pinned_iters_mgr, callback, is_blob_index,
|
|
tracing_get_id, blob_fetcher) {}
|
|
|
|
void GetContext::appendToReplayLog(ValueType type, Slice value, Slice ts) {
|
|
if (replay_log_) {
|
|
if (replay_log_->empty()) {
|
|
// Optimization: in the common case of only one operation in the
|
|
// log, we allocate the exact amount of space needed.
|
|
replay_log_->reserve(1 + VarintLength(value.size()) + value.size());
|
|
}
|
|
replay_log_->push_back(type);
|
|
PutLengthPrefixedSlice(replay_log_, value);
|
|
|
|
// If cf enables ts, there should always be a ts following each value
|
|
if (ucmp_->timestamp_size() > 0) {
|
|
assert(ts.size() == ucmp_->timestamp_size());
|
|
PutLengthPrefixedSlice(replay_log_, ts);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Called from TableCache::Get and Table::Get when file/block in which
|
|
// key may exist are not there in TableCache/BlockCache respectively. In this
|
|
// case we can't guarantee that key does not exist and are not permitted to do
|
|
// IO to be certain.Set the status=kFound and value_found=false to let the
|
|
// caller know that key may exist but is not there in memory
|
|
void GetContext::MarkKeyMayExist() {
|
|
state_ = kFound;
|
|
if (value_found_ != nullptr) {
|
|
*value_found_ = false;
|
|
}
|
|
}
|
|
|
|
void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
|
|
assert(state_ == kNotFound);
|
|
assert(ucmp_->timestamp_size() == 0);
|
|
|
|
appendToReplayLog(kTypeValue, value, Slice());
|
|
|
|
state_ = kFound;
|
|
if (LIKELY(pinnable_val_ != nullptr)) {
|
|
pinnable_val_->PinSelf(value);
|
|
}
|
|
}
|
|
|
|
void GetContext::ReportCounters() {
|
|
if (get_context_stats_.num_cache_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_index_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT,
|
|
get_context_stats_.num_cache_index_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_data_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_HIT,
|
|
get_context_stats_.num_cache_data_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
|
|
get_context_stats_.num_cache_filter_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_hit > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
|
|
get_context_stats_.num_cache_compression_dict_hit);
|
|
}
|
|
if (get_context_stats_.num_cache_index_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
|
|
get_context_stats_.num_cache_index_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS,
|
|
get_context_stats_.num_cache_filter_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_data_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
|
|
get_context_stats_.num_cache_data_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
|
|
get_context_stats_.num_cache_compression_dict_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_bytes_read > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
|
|
get_context_stats_.num_cache_bytes_read);
|
|
}
|
|
if (get_context_stats_.num_cache_miss > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_MISS,
|
|
get_context_stats_.num_cache_miss);
|
|
}
|
|
if (get_context_stats_.num_cache_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
|
|
}
|
|
if (get_context_stats_.num_cache_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_bytes_write > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
|
|
get_context_stats_.num_cache_bytes_write);
|
|
}
|
|
if (get_context_stats_.num_cache_index_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
|
|
get_context_stats_.num_cache_index_add);
|
|
}
|
|
if (get_context_stats_.num_cache_index_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_index_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_index_bytes_insert > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
|
|
get_context_stats_.num_cache_index_bytes_insert);
|
|
}
|
|
if (get_context_stats_.num_cache_data_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
|
|
get_context_stats_.num_cache_data_add);
|
|
}
|
|
if (get_context_stats_.num_cache_data_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_data_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_data_bytes_insert > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
|
|
get_context_stats_.num_cache_data_bytes_insert);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
|
|
get_context_stats_.num_cache_filter_add);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_filter_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
|
|
get_context_stats_.num_cache_filter_bytes_insert);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_add > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
|
|
get_context_stats_.num_cache_compression_dict_add);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
|
|
get_context_stats_.num_cache_compression_dict_add_redundant);
|
|
}
|
|
if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
|
|
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
|
|
get_context_stats_.num_cache_compression_dict_bytes_insert);
|
|
}
|
|
}
|
|
|
|
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
|
|
const Slice& value, bool* matched,
|
|
Status* read_status, Cleanable* value_pinner) {
|
|
assert(matched);
|
|
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
|
|
merge_context_ != nullptr);
|
|
if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) {
|
|
*matched = true;
|
|
// If the value is not in the snapshot, skip it
|
|
if (!CheckCallback(parsed_key.sequence)) {
|
|
return true; // to continue to the next seq
|
|
}
|
|
|
|
if (seq_ != nullptr) {
|
|
// Set the sequence number if it is uninitialized
|
|
if (*seq_ == kMaxSequenceNumber) {
|
|
*seq_ = parsed_key.sequence;
|
|
}
|
|
if (max_covering_tombstone_seq_) {
|
|
*seq_ = std::max(*seq_, *max_covering_tombstone_seq_);
|
|
}
|
|
}
|
|
|
|
size_t ts_sz = ucmp_->timestamp_size();
|
|
Slice ts;
|
|
|
|
if (ts_sz > 0) {
|
|
// ensure always have ts if cf enables ts.
|
|
ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
|
|
if (timestamp_ != nullptr) {
|
|
if (!timestamp_->empty()) {
|
|
assert(ts_sz == timestamp_->size());
|
|
// `timestamp` can be set before `SaveValue` is ever called
|
|
// when max_covering_tombstone_seq_ was set.
|
|
// If this key has a higher sequence number than range tombstone,
|
|
// then timestamp should be updated. `ts_from_rangetombstone_` is
|
|
// set to false afterwards so that only the key with highest seqno
|
|
// updates the timestamp.
|
|
if (ts_from_rangetombstone_) {
|
|
assert(max_covering_tombstone_seq_);
|
|
if (parsed_key.sequence > *max_covering_tombstone_seq_) {
|
|
timestamp_->assign(ts.data(), ts.size());
|
|
ts_from_rangetombstone_ = false;
|
|
}
|
|
}
|
|
}
|
|
// TODO optimize for small size ts
|
|
const std::string kMaxTs(ts_sz, '\xff');
|
|
if (timestamp_->empty() ||
|
|
ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) {
|
|
timestamp_->assign(ts.data(), ts.size());
|
|
}
|
|
}
|
|
}
|
|
appendToReplayLog(parsed_key.type, value, ts);
|
|
|
|
auto type = parsed_key.type;
|
|
Slice unpacked_value = value;
|
|
// Key matches. Process it
|
|
if ((type == kTypeValue || type == kTypeValuePreferredSeqno ||
|
|
type == kTypeMerge || type == kTypeBlobIndex ||
|
|
type == kTypeWideColumnEntity || type == kTypeDeletion ||
|
|
type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) &&
|
|
max_covering_tombstone_seq_ != nullptr &&
|
|
*max_covering_tombstone_seq_ > parsed_key.sequence) {
|
|
// Note that deletion types are also considered, this is for the case
|
|
// when we need to return timestamp to user. If a range tombstone has a
|
|
// higher seqno than point tombstone, its timestamp should be returned.
|
|
type = kTypeRangeDeletion;
|
|
}
|
|
switch (type) {
|
|
case kTypeValue:
|
|
case kTypeValuePreferredSeqno:
|
|
case kTypeBlobIndex:
|
|
case kTypeWideColumnEntity:
|
|
assert(state_ == kNotFound || state_ == kMerge);
|
|
if (type == kTypeValuePreferredSeqno) {
|
|
unpacked_value = ParsePackedValueForValue(value);
|
|
}
|
|
if (type == kTypeBlobIndex) {
|
|
if (is_blob_index_ == nullptr) {
|
|
// Blob value not supported. Stop.
|
|
state_ = kUnexpectedBlobIndex;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if (is_blob_index_ != nullptr) {
|
|
*is_blob_index_ = (type == kTypeBlobIndex);
|
|
}
|
|
|
|
if (kNotFound == state_) {
|
|
state_ = kFound;
|
|
if (do_merge_) {
|
|
if (type == kTypeBlobIndex && ucmp_->timestamp_size() != 0) {
|
|
ukey_with_ts_found_.PinSelf(parsed_key.user_key);
|
|
}
|
|
if (LIKELY(pinnable_val_ != nullptr)) {
|
|
Slice value_to_use = unpacked_value;
|
|
|
|
if (type == kTypeWideColumnEntity) {
|
|
Slice value_copy = unpacked_value;
|
|
|
|
if (!WideColumnSerialization::GetValueOfDefaultColumn(
|
|
value_copy, value_to_use)
|
|
.ok()) {
|
|
state_ = kCorrupt;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if (LIKELY(value_pinner != nullptr)) {
|
|
// If the backing resources for the value are provided, pin them
|
|
pinnable_val_->PinSlice(value_to_use, value_pinner);
|
|
} else {
|
|
TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
|
|
this);
|
|
// Otherwise copy the value
|
|
pinnable_val_->PinSelf(value_to_use);
|
|
}
|
|
} else if (columns_ != nullptr) {
|
|
if (type == kTypeWideColumnEntity) {
|
|
if (!columns_->SetWideColumnValue(unpacked_value, value_pinner)
|
|
.ok()) {
|
|
state_ = kCorrupt;
|
|
return false;
|
|
}
|
|
} else {
|
|
columns_->SetPlainValue(unpacked_value, value_pinner);
|
|
}
|
|
}
|
|
} else {
|
|
// It means this function is called as part of DB GetMergeOperands
|
|
// API and the current value should be part of
|
|
// merge_context_->operand_list
|
|
if (type == kTypeBlobIndex) {
|
|
PinnableSlice pin_val;
|
|
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
|
|
read_status) == false) {
|
|
return false;
|
|
}
|
|
Slice blob_value(pin_val);
|
|
push_operand(blob_value, nullptr);
|
|
} else if (type == kTypeWideColumnEntity) {
|
|
Slice value_copy = unpacked_value;
|
|
Slice value_of_default;
|
|
|
|
if (!WideColumnSerialization::GetValueOfDefaultColumn(
|
|
value_copy, value_of_default)
|
|
.ok()) {
|
|
state_ = kCorrupt;
|
|
return false;
|
|
}
|
|
|
|
push_operand(value_of_default, value_pinner);
|
|
} else {
|
|
assert(type == kTypeValue || type == kTypeValuePreferredSeqno);
|
|
push_operand(unpacked_value, value_pinner);
|
|
}
|
|
}
|
|
} else if (kMerge == state_) {
|
|
assert(merge_operator_ != nullptr);
|
|
if (type == kTypeBlobIndex) {
|
|
PinnableSlice pin_val;
|
|
if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
|
|
read_status) == false) {
|
|
return false;
|
|
}
|
|
Slice blob_value(pin_val);
|
|
state_ = kFound;
|
|
if (do_merge_) {
|
|
MergeWithPlainBaseValue(blob_value);
|
|
} else {
|
|
// It means this function is called as part of DB GetMergeOperands
|
|
// API and the current value should be part of
|
|
// merge_context_->operand_list
|
|
push_operand(blob_value, nullptr);
|
|
}
|
|
} else if (type == kTypeWideColumnEntity) {
|
|
state_ = kFound;
|
|
|
|
if (do_merge_) {
|
|
MergeWithWideColumnBaseValue(unpacked_value);
|
|
} else {
|
|
// It means this function is called as part of DB GetMergeOperands
|
|
// API and the current value should be part of
|
|
// merge_context_->operand_list
|
|
Slice value_copy = unpacked_value;
|
|
Slice value_of_default;
|
|
|
|
if (!WideColumnSerialization::GetValueOfDefaultColumn(
|
|
value_copy, value_of_default)
|
|
.ok()) {
|
|
state_ = kCorrupt;
|
|
return false;
|
|
}
|
|
|
|
push_operand(value_of_default, value_pinner);
|
|
}
|
|
} else {
|
|
assert(type == kTypeValue || type == kTypeValuePreferredSeqno);
|
|
|
|
state_ = kFound;
|
|
if (do_merge_) {
|
|
MergeWithPlainBaseValue(unpacked_value);
|
|
} else {
|
|
// It means this function is called as part of DB GetMergeOperands
|
|
// API and the current value should be part of
|
|
// merge_context_->operand_list
|
|
push_operand(unpacked_value, value_pinner);
|
|
}
|
|
}
|
|
}
|
|
return false;
|
|
|
|
case kTypeDeletion:
|
|
case kTypeDeletionWithTimestamp:
|
|
case kTypeSingleDeletion:
|
|
case kTypeRangeDeletion:
|
|
// TODO(noetzli): Verify correctness once merge of single-deletes
|
|
// is supported
|
|
assert(state_ == kNotFound || state_ == kMerge);
|
|
if (kNotFound == state_) {
|
|
state_ = kDeleted;
|
|
} else if (kMerge == state_) {
|
|
state_ = kFound;
|
|
if (do_merge_) {
|
|
MergeWithNoBaseValue();
|
|
}
|
|
// If do_merge_ = false then the current value shouldn't be part of
|
|
// merge_context_->operand_list
|
|
}
|
|
return false;
|
|
|
|
case kTypeMerge:
|
|
assert(state_ == kNotFound || state_ == kMerge);
|
|
state_ = kMerge;
|
|
// value_pinner is not set from plain_table_reader.cc for example.
|
|
push_operand(value, value_pinner);
|
|
PERF_COUNTER_ADD(internal_merge_point_lookup_count, 1);
|
|
|
|
if (do_merge_ && merge_operator_ != nullptr &&
|
|
merge_operator_->ShouldMerge(
|
|
merge_context_->GetOperandsDirectionBackward())) {
|
|
state_ = kFound;
|
|
MergeWithNoBaseValue();
|
|
return false;
|
|
}
|
|
if (merge_context_->get_merge_operands_options != nullptr &&
|
|
merge_context_->get_merge_operands_options->continue_cb !=
|
|
nullptr &&
|
|
!merge_context_->get_merge_operands_options->continue_cb(value)) {
|
|
state_ = kFound;
|
|
return false;
|
|
}
|
|
return true;
|
|
|
|
default:
|
|
assert(false);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// state_ could be Corrupt, merge or notfound
|
|
return false;
|
|
}
|
|
|
|
void GetContext::PostprocessMerge(const Status& merge_status) {
|
|
if (!merge_status.ok()) {
|
|
if (merge_status.subcode() == Status::SubCode::kMergeOperatorFailed) {
|
|
state_ = kMergeOperatorFailed;
|
|
} else {
|
|
state_ = kCorrupt;
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (LIKELY(pinnable_val_ != nullptr)) {
|
|
pinnable_val_->PinSelf();
|
|
}
|
|
}
|
|
|
|
void GetContext::MergeWithNoBaseValue() {
|
|
assert(do_merge_);
|
|
assert(pinnable_val_ || columns_);
|
|
assert(!pinnable_val_ || !columns_);
|
|
|
|
// `op_failure_scope` (an output parameter) is not provided (set to nullptr)
|
|
// since a failure must be propagated regardless of its value.
|
|
const Status s = MergeHelper::TimedFullMerge(
|
|
merge_operator_, user_key_, MergeHelper::kNoBaseValue,
|
|
merge_context_->GetOperands(), logger_, statistics_, clock_,
|
|
/* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
|
|
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_);
|
|
PostprocessMerge(s);
|
|
}
|
|
|
|
void GetContext::MergeWithPlainBaseValue(const Slice& value) {
|
|
assert(do_merge_);
|
|
assert(pinnable_val_ || columns_);
|
|
assert(!pinnable_val_ || !columns_);
|
|
|
|
// `op_failure_scope` (an output parameter) is not provided (set to nullptr)
|
|
// since a failure must be propagated regardless of its value.
|
|
const Status s = MergeHelper::TimedFullMerge(
|
|
merge_operator_, user_key_, MergeHelper::kPlainBaseValue, value,
|
|
merge_context_->GetOperands(), logger_, statistics_, clock_,
|
|
/* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
|
|
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_);
|
|
PostprocessMerge(s);
|
|
}
|
|
|
|
void GetContext::MergeWithWideColumnBaseValue(const Slice& entity) {
|
|
assert(do_merge_);
|
|
assert(pinnable_val_ || columns_);
|
|
assert(!pinnable_val_ || !columns_);
|
|
|
|
// `op_failure_scope` (an output parameter) is not provided (set to nullptr)
|
|
// since a failure must be propagated regardless of its value.
|
|
const Status s = MergeHelper::TimedFullMerge(
|
|
merge_operator_, user_key_, MergeHelper::kWideBaseValue, entity,
|
|
merge_context_->GetOperands(), logger_, statistics_, clock_,
|
|
/* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
|
|
pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_);
|
|
PostprocessMerge(s);
|
|
}
|
|
|
|
bool GetContext::GetBlobValue(const Slice& user_key, const Slice& blob_index,
|
|
PinnableSlice* blob_value, Status* read_status) {
|
|
constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
|
|
constexpr uint64_t* bytes_read = nullptr;
|
|
|
|
*read_status = blob_fetcher_->FetchBlob(user_key, blob_index, prefetch_buffer,
|
|
blob_value, bytes_read);
|
|
if (!read_status->ok()) {
|
|
if (read_status->IsIncomplete()) {
|
|
// FIXME: this code is not covered by unit tests
|
|
MarkKeyMayExist();
|
|
return false;
|
|
}
|
|
state_ = kCorrupt;
|
|
return false;
|
|
}
|
|
*is_blob_index_ = false;
|
|
return true;
|
|
}
|
|
|
|
void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
|
|
// TODO(yanqin) preserve timestamps information in merge_context
|
|
if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
|
|
value_pinner != nullptr) {
|
|
value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
|
|
merge_context_->PushOperand(value, true /*value_pinned*/);
|
|
} else {
|
|
merge_context_->PushOperand(value, false);
|
|
}
|
|
}
|
|
|
|
Status replayGetContextLog(const Slice& replay_log, const Slice& user_key,
|
|
GetContext* get_context, Cleanable* value_pinner,
|
|
SequenceNumber seq_no) {
|
|
Slice s = replay_log;
|
|
Slice ts;
|
|
size_t ts_sz = get_context->TimestampSize();
|
|
bool ret = false;
|
|
|
|
while (s.size()) {
|
|
auto type = static_cast<ValueType>(*s.data());
|
|
s.remove_prefix(1);
|
|
Slice value;
|
|
ret = GetLengthPrefixedSlice(&s, &value);
|
|
assert(ret);
|
|
|
|
bool dont_care __attribute__((__unused__));
|
|
|
|
// Use a copy to prevent modifying user_key. Modification of user_key
|
|
// could result to potential cache miss.
|
|
std::string user_key_str = user_key.ToString();
|
|
ParsedInternalKey ikey = ParsedInternalKey(user_key_str, seq_no, type);
|
|
|
|
// If ts enabled for current cf, there will always be ts appended after each
|
|
// piece of value.
|
|
if (ts_sz > 0) {
|
|
ret = GetLengthPrefixedSlice(&s, &ts);
|
|
assert(ts_sz == ts.size());
|
|
assert(ret);
|
|
ikey.SetTimestamp(ts);
|
|
}
|
|
|
|
(void)ret;
|
|
|
|
Status read_status;
|
|
get_context->SaveValue(ikey, value, &dont_care, &read_status, value_pinner);
|
|
if (!read_status.ok()) {
|
|
return read_status;
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|