2024-11-12 17:27:11 +00:00
|
|
|
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
|
|
|
|
#include "memtable/wbwi_memtable.h"
|
|
|
|
|
|
|
|
#include "db/memtable.h"
|
|
|
|
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
|
|
|
|
const std::unordered_map<WriteType, ValueType>
|
|
|
|
WBWIMemTableIterator::WriteTypeToValueTypeMap = {
|
|
|
|
{kPutRecord, kTypeValue},
|
|
|
|
{kMergeRecord, kTypeMerge},
|
|
|
|
{kDeleteRecord, kTypeDeletion},
|
|
|
|
{kSingleDeleteRecord, kTypeSingleDeletion},
|
|
|
|
{kDeleteRangeRecord, kTypeRangeDeletion},
|
|
|
|
{kPutEntityRecord, kTypeWideColumnEntity},
|
|
|
|
// Only the above record types are added to WBWI.
|
|
|
|
// kLogDataRecord, kXIDRecord, kUnknownRecord
|
|
|
|
};
|
|
|
|
|
|
|
|
bool WBWIMemTable::Get(const LookupKey& key, std::string* value,
|
|
|
|
PinnableWideColumns* columns, std::string* timestamp,
|
|
|
|
Status* s, MergeContext* merge_context,
|
|
|
|
SequenceNumber* max_covering_tombstone_seq,
|
|
|
|
SequenceNumber* out_seq, const ReadOptions&,
|
|
|
|
bool immutable_memtable, ReadCallback* callback,
|
|
|
|
bool* is_blob_index, bool do_merge) {
|
|
|
|
(void)immutable_memtable;
|
|
|
|
(void)timestamp;
|
|
|
|
(void)columns;
|
|
|
|
assert(immutable_memtable);
|
|
|
|
assert(!timestamp); // TODO: support UDT
|
|
|
|
assert(!columns); // TODO: support WideColumn
|
|
|
|
assert(global_seqno_ != kMaxSequenceNumber);
|
|
|
|
// WBWI does not support DeleteRange yet.
|
|
|
|
assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
|
|
|
|
|
|
|
|
[[maybe_unused]] SequenceNumber read_seq =
|
|
|
|
GetInternalKeySeqno(key.internal_key());
|
|
|
|
std::unique_ptr<InternalIterator> iter{NewIterator()};
|
|
|
|
iter->Seek(key.internal_key());
|
|
|
|
const Slice lookup_user_key = key.user_key();
|
|
|
|
|
|
|
|
while (iter->Valid() && comparator_->EqualWithoutTimestamp(
|
|
|
|
ExtractUserKey(iter->key()), lookup_user_key)) {
|
|
|
|
uint64_t tag = ExtractInternalKeyFooter(iter->key());
|
|
|
|
ValueType type;
|
|
|
|
SequenceNumber seq;
|
|
|
|
UnPackSequenceAndType(tag, &seq, &type);
|
|
|
|
// Unsupported operations.
|
|
|
|
assert(type != kTypeBlobIndex);
|
|
|
|
assert(type != kTypeWideColumnEntity);
|
|
|
|
assert(type != kTypeValuePreferredSeqno);
|
|
|
|
assert(type != kTypeDeletionWithTimestamp);
|
|
|
|
assert(type != kTypeMerge);
|
|
|
|
if (!callback || callback->IsVisible(seq)) {
|
|
|
|
if (*out_seq == kMaxSequenceNumber) {
|
|
|
|
*out_seq = std::max(seq, *max_covering_tombstone_seq);
|
|
|
|
}
|
|
|
|
if (*max_covering_tombstone_seq > seq) {
|
|
|
|
type = kTypeRangeDeletion;
|
|
|
|
}
|
|
|
|
switch (type) {
|
|
|
|
case kTypeValue: {
|
|
|
|
HandleTypeValue(lookup_user_key, iter->value(), iter->IsValuePinned(),
|
|
|
|
do_merge, s->IsMergeInProgress(), merge_context,
|
|
|
|
moptions_.merge_operator, clock_,
|
|
|
|
moptions_.statistics, moptions_.info_log, s, value,
|
|
|
|
columns, is_blob_index);
|
|
|
|
assert(seq <= read_seq);
|
|
|
|
return /*found_final_value=*/true;
|
|
|
|
}
|
|
|
|
case kTypeDeletion:
|
|
|
|
case kTypeSingleDeletion:
|
|
|
|
case kTypeRangeDeletion: {
|
|
|
|
HandleTypeDeletion(lookup_user_key, s->IsMergeInProgress(),
|
|
|
|
merge_context, moptions_.merge_operator, clock_,
|
|
|
|
moptions_.statistics, moptions_.info_log, s, value,
|
|
|
|
columns);
|
|
|
|
assert(seq <= read_seq);
|
|
|
|
return /*found_final_value=*/true;
|
|
|
|
}
|
|
|
|
default: {
|
|
|
|
std::string msg("Unrecognized or unsupported value type: " +
|
|
|
|
std::to_string(static_cast<int>(type)) + ". ");
|
|
|
|
msg.append("User key: " +
|
|
|
|
ExtractUserKey(iter->key()).ToString(/*hex=*/true) + ". ");
|
|
|
|
msg.append("seq: " + std::to_string(seq) + ".");
|
|
|
|
*s = Status::Corruption(msg.c_str());
|
|
|
|
return /*found_final_value=*/true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Current key not visible or we read a merge key
|
|
|
|
assert(s->IsMergeInProgress() || (callback && !callback->IsVisible(seq)));
|
|
|
|
iter->Next();
|
|
|
|
}
|
|
|
|
if (!iter->status().ok() &&
|
|
|
|
(s->ok() || s->IsMergeInProgress() || s->IsNotFound())) {
|
|
|
|
*s = iter->status();
|
|
|
|
// stop further look up
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
return /*found_final_value=*/false;
|
|
|
|
}
|
|
|
|
|
|
|
|
void WBWIMemTable::MultiGet(const ReadOptions& read_options,
|
|
|
|
MultiGetRange* range, ReadCallback* callback,
|
|
|
|
bool immutable_memtable) {
|
|
|
|
(void)immutable_memtable;
|
|
|
|
// Should only be used as immutable memtable.
|
|
|
|
assert(immutable_memtable);
|
|
|
|
// TODO: reuse the InternalIterator created in Get().
|
|
|
|
for (auto iter = range->begin(); iter != range->end(); ++iter) {
|
Fix a valgrind unit test failure (#13137)
Summary:
fix the valgrind failure from https://github.com/facebook/rocksdb/actions/runs/11813904728/job/32911902535?fbclid=IwZXh0bgNhZW0CMTEAAR2GJs1U6mNwNv3zwPzU8rpCmBHqfStV3dupj2o_-686RneLKXADaSZH5-U_aem_ADUQy7bzknoseVpjrOc5SQ
```
[ RUN ] WBWIMemTableTest.ReadFromWBWIMemtable
==1150870== Conditional jump or move depends on uninitialised value(s)
==1150870== at 0x50FE67A: rocksdb::WBWIMemTable::Get(rocksdb::LookupKey const&, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*, rocksdb::PinnableWideColumns*, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >*, rocksdb::Status*, rocksdb::MergeContext*, unsigned long*, unsigned long*, rocksdb::ReadOptions const&, bool, rocksdb::ReadCallback*, bool*, bool) (wbwi_memtable.cc:60)
==1150870== by 0x50FF92A: rocksdb::WBWIMemTable::MultiGet(rocksdb::ReadOptions const&, rocksdb::MultiGetContext::Range*, rocksdb::ReadCallback*, bool) (wbwi_memtable.cc:120)
==1150870== by 0x1879EF: rocksdb::WBWIMemTableTest_ReadFromWBWIMemtable_Test::TestBody() (write_batch_with_index_test.cc:3580)
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13137
Test Plan: `valgrind ./write_batch_with_index_test --gtest_filter="*ReadFromWBWIMemtable*"`
Reviewed By: ltamasi
Differential Revision: D65892657
Pulled By: cbi42
fbshipit-source-id: 0b44a5a06b8cc64173ad36966339877e2f508d52
2024-11-13 20:41:56 +00:00
|
|
|
SequenceNumber dummy_seq = 0;
|
2024-11-12 17:27:11 +00:00
|
|
|
bool found_final_value =
|
|
|
|
Get(*iter->lkey, iter->value ? iter->value->GetSelf() : nullptr,
|
|
|
|
iter->columns, iter->timestamp, iter->s, &(iter->merge_context),
|
|
|
|
&(iter->max_covering_tombstone_seq), &dummy_seq, read_options, true,
|
|
|
|
callback, nullptr, true);
|
|
|
|
if (found_final_value) {
|
|
|
|
if (iter->s->ok() || iter->s->IsNotFound()) {
|
|
|
|
if (iter->value) {
|
|
|
|
iter->value->PinSelf();
|
|
|
|
range->AddValueSize(iter->value->size());
|
|
|
|
} else {
|
|
|
|
assert(iter->columns);
|
|
|
|
range->AddValueSize(iter->columns->serialized_size());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
range->MarkKeyDone(iter);
|
|
|
|
if (range->GetValueSize() > read_options.value_size_soft_limit) {
|
|
|
|
// Set all remaining keys in range to Abort
|
|
|
|
for (auto range_iter = range->begin(); range_iter != range->end();
|
|
|
|
++range_iter) {
|
|
|
|
range->MarkKeyDone(range_iter);
|
|
|
|
*(range_iter->s) = Status::Aborted();
|
|
|
|
}
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|