mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-27 11:43:49 +00:00
bf98dcf9a8
Summary: The original goal is to propagate failures from `GetContext::SaveValue()` -> `GetContext::GetBlobValue()` -> `BlobFetcher::FetchBlob()` up to the user. This call sequence happens when a merge chain ends with a base value in a blob file. There's also fixes for bugs encountered along the way where non-ok statuses were ignored/overwritten, and a bit of plumbing work for functions that had no capability to return a status. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12462 Test Plan: A repro command ``` db=/dev/shm/dbstress_db ; exp=/dev/shm/dbstress_exp ; rm -rf $db $exp ; mkdir -p $db $exp ./db_stress \ --clear_column_family_one_in=0 \ --test_batches_snapshots=0 \ --write_fault_one_in=0 \ --use_put_entity_one_in=0 \ --prefixpercent=0 \ --read_fault_one_in=0 \ --readpercent=0 \ --reopen=0 \ --set_options_one_in=10000 \ --delpercent=0 \ --delrangepercent=0 \ --open_metadata_write_fault_one_in=0 \ --open_read_fault_one_in=0 \ --open_write_fault_one_in=0 \ --destroy_db_initially=0 \ --ingest_external_file_one_in=0 \ --iterpercent=0 \ --nooverwritepercent=0 \ --db=$db \ --enable_blob_files=1 \ --expected_values_dir=$exp \ --max_background_compactions=20 \ --max_bytes_for_level_base=2097152 \ --max_key=100000 \ --min_blob_size=0 \ --open_files=-1 \ --ops_per_thread=100000000 \ --prefix_size=-1 \ --target_file_size_base=524288 \ --use_merge=1 \ --value_size_mult=32 \ --write_buffer_size=524288 \ --writepercent=100 ``` It used to fail like: ``` ... frame https://github.com/facebook/rocksdb/issues/9: 0x00007fc63903bc93 libc.so.6`__GI___assert_fail(assertion="HasDefaultColumn(columns)", file="fbcode/internal_repo_rocksdb/repo/db/wide/wide_columns_helper.h", line=33, function="static const rocksdb::Slice &rocksdb::WideColumnsHelper::GetDefaultColumn(const rocksdb::WideColumns &)") at assert.c:101:3 frame https://github.com/facebook/rocksdb/issues/10: 0x00000000006f7e92 db_stress`rocksdb::Version::Get(rocksdb::ReadOptions const&, rocksdb::LookupKey const&, rocksdb::PinnableSlice*, rocksdb::PinnableWideColumns*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char>>*, rocksdb::Status*, rocksdb::MergeContext*, unsigned long*, rocksdb::PinnedIteratorsManager*, bool*, bool*, unsigned long*, rocksdb::ReadCallback*, bool*, bool) [inlined] rocksdb::WideColumnsHelper::GetDefaultColumn(columns=size=0) at wide_columns_helper.h:33 frame https://github.com/facebook/rocksdb/issues/11: 0x00000000006f7e76 db_stress`rocksdb::Version::Get(this=0x00007fc5ec763000, read_options=<unavailable>, k=<unavailable>, value=0x0000000000000000, columns=0x00007fc6035fd1d8, timestamp=<unavailable>, status=0x00007fc6035fd250, merge_context=0x00007fc6035fce40, max_covering_tombstone_seq=0x00007fc6035fce90, pinned_iters_mgr=0x00007fc6035fcdf0, value_found=0x0000000000000000, key_exists=0x0000000000000000, seq=0x0000000000000000, callback=0x0000000000000000, is_blob=0x0000000000000000, do_merge=<unavailable>) at version_set.cc:2492 frame https://github.com/facebook/rocksdb/issues/12: 0x000000000051e245 db_stress`rocksdb::DBImpl::GetImpl(this=0x00007fc637a86000, read_options=0x00007fc6035fcf60, key=<unavailable>, get_impl_options=0x00007fc6035fd000) at db_impl.cc:2408 frame https://github.com/facebook/rocksdb/issues/13: 0x000000000050cec2 db_stress`rocksdb::DBImpl::GetEntity(this=0x00007fc637a86000, _read_options=<unavailable>, column_family=<unavailable>, key=0x00007fc6035fd3c8, columns=0x00007fc6035fd1d8) at db_impl.cc:2109 frame https://github.com/facebook/rocksdb/issues/14: 0x000000000074f688 db_stress`rocksdb::(anonymous namespace)::MemTableInserter::MergeCF(this=0x00007fc6035fd450, column_family_id=2, key=0x00007fc6035fd3c8, value=0x00007fc6035fd3a0) at write_batch.cc:2656 frame https://github.com/facebook/rocksdb/issues/15: 0x00000000007476fc db_stress`rocksdb::WriteBatchInternal::Iterate(wb=0x00007fc6035fe698, handler=0x00007fc6035fd450, begin=12, end=<unavailable>) at write_batch.cc:607 frame https://github.com/facebook/rocksdb/issues/16: 0x000000000074d7dd db_stress`rocksdb::WriteBatchInternal::InsertInto(rocksdb::WriteThread::WriteGroup&, unsigned long, rocksdb::ColumnFamilyMemTables*, rocksdb::FlushScheduler*, rocksdb::TrimHistoryScheduler*, bool, unsigned long, rocksdb::DB*, bool, bool, bool) [inlined] rocksdb::WriteBatch::Iterate(this=<unavailable>, handler=0x00007fc6035fd450) const at write_batch.cc:505 frame https://github.com/facebook/rocksdb/issues/17: 0x000000000074d77b db_stress`rocksdb::WriteBatchInternal::InsertInto(write_group=<unavailable>, sequence=<unavailable>, memtables=<unavailable>, flush_scheduler=<unavailable>, trim_history_scheduler=<unavailable>, ignore_missing_column_families=<unavailable>, recovery_log_number=0, db=0x00007fc637a86000, concurrent_memtable_writes=<unavailable>, seq_per_batch=false, batch_per_txn=<unavailable>) at write_batch.cc:3084 frame https://github.com/facebook/rocksdb/issues/18: 0x0000000000631d77 db_stress`rocksdb::DBImpl::PipelinedWriteImpl(this=0x00007fc637a86000, write_options=<unavailable>, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=<unavailable>, log_ref=0, disable_memtable=<unavailable>, seq_used=0x0000000000000000) at db_impl_write.cc:807 frame https://github.com/facebook/rocksdb/issues/19: 0x000000000062ceeb db_stress`rocksdb::DBImpl::WriteImpl(this=<unavailable>, write_options=<unavailable>, my_batch=0x00007fc6035fe698, callback=0x0000000000000000, log_used=<unavailable>, log_ref=0, disable_memtable=<unavailable>, seq_used=0x0000000000000000, batch_cnt=0, pre_release_callback=0x0000000000000000, post_memtable_callback=0x0000000000000000) at db_impl_write.cc:312 frame https://github.com/facebook/rocksdb/issues/20: 0x000000000062c8ec db_stress`rocksdb::DBImpl::Write(this=0x00007fc637a86000, write_options=0x00007fc6035feca8, my_batch=0x00007fc6035fe698) at db_impl_write.cc:157 frame https://github.com/facebook/rocksdb/issues/21: 0x000000000062b847 db_stress`rocksdb::DB::Merge(this=0x00007fc637a86000, opt=0x00007fc6035feca8, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, value=0x00007fc6035fe830) at db_impl_write.cc:2544 frame https://github.com/facebook/rocksdb/issues/22: 0x000000000062b6ef db_stress`rocksdb::DBImpl::Merge(this=0x00007fc637a86000, o=<unavailable>, column_family=0x00007fc6370bf140, key=0x00007fc6035fe8d8, val=0x00007fc6035fe830) at db_impl_write.cc:72 frame https://github.com/facebook/rocksdb/issues/23: 0x00000000004d6397 db_stress`rocksdb::NonBatchedOpsStressTest::TestPut(this=0x00007fc637041000, thread=0x00007fc6370dbc00, write_opts=0x00007fc6035feca8, read_opts=0x00007fc6035fe9c8, rand_column_families=<unavailable>, rand_keys=size=1, value={P\xe9_\x03\xc6\x7f\0\0}) at no_batched_ops_stress.cc:1317 frame https://github.com/facebook/rocksdb/issues/24: 0x000000000049361d db_stress`rocksdb::StressTest::OperateDb(this=0x00007fc637041000, thread=0x00007fc6370dbc00) at db_stress_test_base.cc:1148 ... ``` Reviewed By: ltamasi Differential Revision: D55157795 Pulled By: ajkr fbshipit-source-id: 5f7c1380ead5794c29d41680028e34b839744764
260 lines
10 KiB
C++
260 lines
10 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#pragma once
|
|
#include <string>
|
|
|
|
#include "db/read_callback.h"
|
|
#include "rocksdb/types.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
class BlobFetcher;
|
|
class Comparator;
|
|
class Logger;
|
|
class MergeContext;
|
|
class MergeOperator;
|
|
class PinnableWideColumns;
|
|
class PinnedIteratorsManager;
|
|
class Statistics;
|
|
class SystemClock;
|
|
struct ParsedInternalKey;
|
|
|
|
// Data structure for accumulating statistics during a point lookup. At the
|
|
// end of the point lookup, the corresponding ticker stats are updated. This
|
|
// avoids the overhead of frequent ticker stats updates
|
|
struct GetContextStats {
|
|
uint64_t num_cache_hit = 0;
|
|
uint64_t num_cache_index_hit = 0;
|
|
uint64_t num_cache_data_hit = 0;
|
|
uint64_t num_cache_filter_hit = 0;
|
|
uint64_t num_cache_compression_dict_hit = 0;
|
|
uint64_t num_cache_index_miss = 0;
|
|
uint64_t num_cache_filter_miss = 0;
|
|
uint64_t num_cache_data_miss = 0;
|
|
uint64_t num_cache_compression_dict_miss = 0;
|
|
uint64_t num_cache_bytes_read = 0;
|
|
uint64_t num_cache_miss = 0;
|
|
uint64_t num_cache_add = 0;
|
|
uint64_t num_cache_add_redundant = 0;
|
|
uint64_t num_cache_bytes_write = 0;
|
|
uint64_t num_cache_index_add = 0;
|
|
uint64_t num_cache_index_add_redundant = 0;
|
|
uint64_t num_cache_index_bytes_insert = 0;
|
|
uint64_t num_cache_data_add = 0;
|
|
uint64_t num_cache_data_add_redundant = 0;
|
|
uint64_t num_cache_data_bytes_insert = 0;
|
|
uint64_t num_cache_filter_add = 0;
|
|
uint64_t num_cache_filter_add_redundant = 0;
|
|
uint64_t num_cache_filter_bytes_insert = 0;
|
|
uint64_t num_cache_compression_dict_add = 0;
|
|
uint64_t num_cache_compression_dict_add_redundant = 0;
|
|
uint64_t num_cache_compression_dict_bytes_insert = 0;
|
|
// MultiGet stats.
|
|
uint64_t num_filter_read = 0;
|
|
uint64_t num_index_read = 0;
|
|
uint64_t num_sst_read = 0;
|
|
};
|
|
|
|
// A class to hold context about a point lookup, such as pointer to value
|
|
// slice, key, merge context etc, as well as the current state of the
|
|
// lookup. Any user using GetContext to track the lookup result must call
|
|
// SaveValue() whenever the internal key is found. This can happen
|
|
// repeatedly in case of merge operands. In case the key may exist with
|
|
// high probability, but IO is required to confirm and the user doesn't allow
|
|
// it, MarkKeyMayExist() must be called instead of SaveValue().
|
|
class GetContext {
|
|
public:
|
|
// Current state of the point lookup. All except kNotFound and kMerge are
|
|
// terminal states
|
|
enum GetState {
|
|
kNotFound,
|
|
kFound,
|
|
kDeleted,
|
|
kCorrupt,
|
|
kMerge, // saver contains the current merge result (the operands)
|
|
kUnexpectedBlobIndex,
|
|
kMergeOperatorFailed,
|
|
};
|
|
GetContextStats get_context_stats_;
|
|
|
|
// Constructor
|
|
// @param value Holds the value corresponding to user_key. If its nullptr
|
|
// then return all merge operands corresponding to user_key
|
|
// via merge_context
|
|
// @param value_found If non-nullptr, set to false if key may be present
|
|
// but we can't be certain because we cannot do IO
|
|
// @param max_covering_tombstone_seq Pointer to highest sequence number of
|
|
// range deletion covering the key. When an internal key
|
|
// is found with smaller sequence number, the lookup
|
|
// terminates
|
|
// @param seq If non-nullptr, the sequence number of the found key will be
|
|
// saved here
|
|
// @param callback Pointer to ReadCallback to perform additional checks
|
|
// for visibility of a key
|
|
// @param is_blob_index If non-nullptr, will be used to indicate if a found
|
|
// key is of type blob index
|
|
// @param do_merge True if value associated with user_key has to be returned
|
|
// and false if all the merge operands associated with user_key has to be
|
|
// returned. Id do_merge=false then all the merge operands are stored in
|
|
// merge_context and they are never merged. The value pointer is untouched.
|
|
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
|
|
Logger* logger, Statistics* statistics, GetState init_state,
|
|
const Slice& user_key, PinnableSlice* value,
|
|
PinnableWideColumns* columns, bool* value_found,
|
|
MergeContext* merge_context, bool do_merge,
|
|
SequenceNumber* max_covering_tombstone_seq, SystemClock* clock,
|
|
SequenceNumber* seq = nullptr,
|
|
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
|
|
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
|
|
uint64_t tracing_get_id = 0, BlobFetcher* blob_fetcher = nullptr);
|
|
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
|
|
Logger* logger, Statistics* statistics, GetState init_state,
|
|
const Slice& user_key, PinnableSlice* value,
|
|
PinnableWideColumns* columns, std::string* timestamp,
|
|
bool* value_found, MergeContext* merge_context, bool do_merge,
|
|
SequenceNumber* max_covering_tombstone_seq, SystemClock* clock,
|
|
SequenceNumber* seq = nullptr,
|
|
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
|
|
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
|
|
uint64_t tracing_get_id = 0, BlobFetcher* blob_fetcher = nullptr);
|
|
|
|
GetContext() = delete;
|
|
|
|
// This can be called to indicate that a key may be present, but cannot be
|
|
// confirmed due to IO not allowed
|
|
void MarkKeyMayExist();
|
|
|
|
// Records this key, value, and any meta-data (such as sequence number and
|
|
// state) into this GetContext.
|
|
//
|
|
// If the parsed_key matches the user key that we are looking for, sets
|
|
// matched to true.
|
|
//
|
|
// Returns True if more keys need to be read (due to merges) or
|
|
// False if the complete value has been found.
|
|
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value,
|
|
bool* matched, Status* read_status,
|
|
Cleanable* value_pinner = nullptr);
|
|
|
|
// Simplified version of the previous function. Should only be used when we
|
|
// know that the operation is a Put.
|
|
void SaveValue(const Slice& value, SequenceNumber seq);
|
|
|
|
GetState State() const { return state_; }
|
|
|
|
SequenceNumber* max_covering_tombstone_seq() {
|
|
return max_covering_tombstone_seq_;
|
|
}
|
|
|
|
bool NeedTimestamp() { return timestamp_ != nullptr; }
|
|
|
|
inline size_t TimestampSize() { return ucmp_->timestamp_size(); }
|
|
|
|
void SetTimestampFromRangeTombstone(const Slice& timestamp) {
|
|
assert(timestamp_);
|
|
timestamp_->assign(timestamp.data(), timestamp.size());
|
|
ts_from_rangetombstone_ = true;
|
|
}
|
|
|
|
PinnedIteratorsManager* pinned_iters_mgr() { return pinned_iters_mgr_; }
|
|
|
|
// If a non-null string is passed, all the SaveValue calls will be
|
|
// logged into the string. The operations can then be replayed on
|
|
// another GetContext with replayGetContextLog.
|
|
void SetReplayLog(std::string* replay_log) { replay_log_ = replay_log; }
|
|
|
|
// Do we need to fetch the SequenceNumber for this key?
|
|
bool NeedToReadSequence() const { return (seq_ != nullptr); }
|
|
|
|
bool sample() const { return sample_; }
|
|
|
|
bool CheckCallback(SequenceNumber seq) {
|
|
if (callback_) {
|
|
return callback_->IsVisible(seq);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void ReportCounters();
|
|
|
|
bool has_callback() const { return callback_ != nullptr; }
|
|
|
|
const Slice& ukey_to_get_blob_value() const {
|
|
if (!ukey_with_ts_found_.empty()) {
|
|
return ukey_with_ts_found_;
|
|
} else {
|
|
return user_key_;
|
|
}
|
|
}
|
|
|
|
uint64_t get_tracing_get_id() const { return tracing_get_id_; }
|
|
|
|
void push_operand(const Slice& value, Cleanable* value_pinner);
|
|
|
|
private:
|
|
// Helper method that postprocesses the results of merge operations, e.g. it
|
|
// sets the state correctly upon merge errors.
|
|
void PostprocessMerge(const Status& merge_status);
|
|
|
|
// The following methods perform the actual merge operation for the
|
|
// no base value/plain base value/wide-column base value cases.
|
|
void MergeWithNoBaseValue();
|
|
void MergeWithPlainBaseValue(const Slice& value);
|
|
void MergeWithWideColumnBaseValue(const Slice& entity);
|
|
|
|
bool GetBlobValue(const Slice& user_key, const Slice& blob_index,
|
|
PinnableSlice* blob_value, Status* read_status);
|
|
|
|
void appendToReplayLog(ValueType type, Slice value, Slice ts);
|
|
|
|
const Comparator* ucmp_;
|
|
const MergeOperator* merge_operator_;
|
|
// the merge operations encountered;
|
|
Logger* logger_;
|
|
Statistics* statistics_;
|
|
|
|
GetState state_;
|
|
Slice user_key_;
|
|
// When a blob index is found with the user key containing timestamp,
|
|
// this copies the corresponding user key on record in the sst file
|
|
// and is later used for blob verification.
|
|
PinnableSlice ukey_with_ts_found_;
|
|
PinnableSlice* pinnable_val_;
|
|
PinnableWideColumns* columns_;
|
|
std::string* timestamp_;
|
|
bool ts_from_rangetombstone_{false};
|
|
bool* value_found_; // Is value set correctly? Used by KeyMayExist
|
|
MergeContext* merge_context_;
|
|
SequenceNumber* max_covering_tombstone_seq_;
|
|
SystemClock* clock_;
|
|
// If a key is found, seq_ will be set to the SequenceNumber of most recent
|
|
// write to the key or kMaxSequenceNumber if unknown
|
|
SequenceNumber* seq_;
|
|
std::string* replay_log_;
|
|
// Used to temporarily pin blocks when state_ == GetContext::kMerge
|
|
PinnedIteratorsManager* pinned_iters_mgr_;
|
|
ReadCallback* callback_;
|
|
bool sample_;
|
|
// Value is true if it's called as part of DB Get API and false if it's
|
|
// called as part of DB GetMergeOperands API. When it's false merge operators
|
|
// are never merged.
|
|
bool do_merge_;
|
|
bool* is_blob_index_;
|
|
// Used for block cache tracing only. A tracing get id uniquely identifies a
|
|
// Get or a MultiGet.
|
|
const uint64_t tracing_get_id_;
|
|
BlobFetcher* blob_fetcher_;
|
|
};
|
|
|
|
// Call this to replay a log and bring the get_context up to date. The replay
|
|
// log must have been created by another GetContext object, whose replay log
|
|
// must have been set by calling GetContext::SetReplayLog().
|
|
Status replayGetContextLog(const Slice& replay_log, const Slice& user_key,
|
|
GetContext* get_context,
|
|
Cleanable* value_pinner = nullptr,
|
|
SequenceNumber seq_no = kMaxSequenceNumber);
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|