Introduce a WriteBatchWithIndex-based implementation of ReadOnlyMemTable (#13123)

Summary:
introduce the class WBWIMemTable that implements ReadOnlyMemTable interface with data stored in a WriteBatchWithIndex object.

This PR implements the main read path: Get, MultiGet and Iterator. It only supports Put, Delete and SingleDelete operations for now. All the keys in the WBWIMemTable will be assigned a global sequence number through WBWIMemTable::SetGlobalSequenceNumber().

Planned follow up PRs:
- Create WBWIMemTable with a transaction's WBWI and ingest it into a DB during Transaction::Commit()
- Support for Merge. This will be more complicated since we can have multiple updates with the same user key for Merge.
- Support for other operations like WideColumn and other ReadOnlyMemTable methods.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13123

Test Plan: * A mini-stress test for the read path is added as a new unit test

Reviewed By: jowlyzhang

Differential Revision: D65633419

Pulled By: cbi42

fbshipit-source-id: 0684fe47260b41f51ca39c300eb72ca5bc9c5a3b
This commit is contained in:
Changyu Bi 2024-11-12 09:27:11 -08:00 committed by Facebook GitHub Bot
parent 7cb6b93eee
commit 1c7652fcef
11 changed files with 857 additions and 63 deletions

View file

@ -779,6 +779,7 @@ set(SOURCES
memtable/hash_skiplist_rep.cc memtable/hash_skiplist_rep.cc
memtable/skiplistrep.cc memtable/skiplistrep.cc
memtable/vectorrep.cc memtable/vectorrep.cc
memtable/wbwi_memtable.cc
memtable/write_buffer_manager.cc memtable/write_buffer_manager.cc
monitoring/histogram.cc monitoring/histogram.cc
monitoring/histogram_windowing.cc monitoring/histogram_windowing.cc

View file

@ -149,6 +149,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"memtable/hash_skiplist_rep.cc", "memtable/hash_skiplist_rep.cc",
"memtable/skiplistrep.cc", "memtable/skiplistrep.cc",
"memtable/vectorrep.cc", "memtable/vectorrep.cc",
"memtable/wbwi_memtable.cc",
"memtable/write_buffer_manager.cc", "memtable/write_buffer_manager.cc",
"monitoring/histogram.cc", "monitoring/histogram.cc",
"monitoring/histogram_windowing.cc", "monitoring/histogram_windowing.cc",

View file

@ -400,7 +400,7 @@ Status ExternalSstFileIngestionJob::Run() {
// specific state of Memtables. The mutable Memtable should be empty, and the // specific state of Memtables. The mutable Memtable should be empty, and the
// immutable Memtable list should be empty. // immutable Memtable list should be empty.
if (flushed_before_run_ && (super_version->imm->NumNotFlushed() != 0 || if (flushed_before_run_ && (super_version->imm->NumNotFlushed() != 0 ||
super_version->mem->GetDataSize() != 0)) { !super_version->mem->IsEmpty())) {
return Status::TryAgain( return Status::TryAgain(
"Inconsistent memtable state detected when flushed before run."); "Inconsistent memtable state detected when flushed before run.");
} }

View file

@ -1251,47 +1251,16 @@ static bool SaveValue(void* arg, const char* entry) {
case kTypeValue: case kTypeValue:
case kTypeValuePreferredSeqno: { case kTypeValuePreferredSeqno: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length); Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
if (type == kTypeValuePreferredSeqno) { if (type == kTypeValuePreferredSeqno) {
v = ParsePackedValueForValue(v); v = ParsePackedValueForValue(v);
} }
*(s->status) = Status::OK(); ReadOnlyMemTable::HandleTypeValue(
s->key->user_key(), v, s->inplace_update_support == false,
if (!s->do_merge) { s->do_merge, *(s->merge_in_progress), merge_context,
// Preserve the value with the goal of returning it as part of s->merge_operator, s->clock, s->statistics, s->logger, s->status,
// raw merge operands to the user s->value, s->columns, s->is_blob_index);
// TODO(yanqin) update MergeContext so that timestamps information
// can also be retained.
merge_context->PushOperand(
v, s->inplace_update_support == false /* operand_pinned */);
} else if (*(s->merge_in_progress)) {
assert(s->do_merge);
if (s->value || s->columns) {
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(),
MergeHelper::kPlainBaseValue, v, merge_context->GetOperands(),
s->logger, s->statistics, s->clock,
/* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
s->value, s->columns);
}
} else if (s->value) {
s->value->assign(v.data(), v.size());
} else if (s->columns) {
s->columns->SetPlainValue(v);
}
*(s->found_final_value) = true; *(s->found_final_value) = true;
if (s->is_blob_index != nullptr) {
*(s->is_blob_index) = false;
}
return false; return false;
} }
case kTypeWideColumnEntity: { case kTypeWideColumnEntity: {
@ -1348,25 +1317,10 @@ static bool SaveValue(void* arg, const char* entry) {
case kTypeDeletionWithTimestamp: case kTypeDeletionWithTimestamp:
case kTypeSingleDeletion: case kTypeSingleDeletion:
case kTypeRangeDeletion: { case kTypeRangeDeletion: {
if (*(s->merge_in_progress)) { ReadOnlyMemTable::HandleTypeDeletion(
if (s->value || s->columns) { s->key->user_key(), *(s->merge_in_progress), s->merge_context,
// `op_failure_scope` (an output parameter) is not provided (set to s->merge_operator, s->clock, s->statistics, s->logger, s->status,
// nullptr) since a failure must be propagated regardless of its s->value, s->columns);
// value.
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), MergeHelper::kNoBaseValue,
merge_context->GetOperands(), s->logger, s->statistics,
s->clock, /* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr, s->value, s->columns);
} else {
// We have found a final value (a base deletion) and have newer
// merge operands that we do not intend to merge. Nothing remains
// to be done so assign status to OK.
*(s->status) = Status::OK();
}
} else {
*(s->status) = Status::NotFound();
}
*(s->found_final_value) = true; *(s->found_final_value) = true;
return false; return false;
} }

View file

@ -18,6 +18,7 @@
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/kv_checksum.h" #include "db/kv_checksum.h"
#include "db/merge_helper.h"
#include "db/range_tombstone_fragmenter.h" #include "db/range_tombstone_fragmenter.h"
#include "db/read_callback.h" #include "db/read_callback.h"
#include "db/seqno_to_time_mapping.h" #include "db/seqno_to_time_mapping.h"
@ -266,6 +267,11 @@ class ReadOnlyMemTable {
// operations on the same MemTable (unless this Memtable is immutable). // operations on the same MemTable (unless this Memtable is immutable).
virtual SequenceNumber GetFirstSequenceNumber() = 0; virtual SequenceNumber GetFirstSequenceNumber() = 0;
// Returns if there is no entry inserted to the mem table.
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
virtual bool IsEmpty() const = 0;
// Returns the sequence number that is guaranteed to be smaller than or equal // Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into this // to the sequence number of any key that could be inserted into this
// memtable. It can then be assumed that any write with a larger(or equal) // memtable. It can then be assumed that any write with a larger(or equal)
@ -366,12 +372,76 @@ class ReadOnlyMemTable {
void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) { void SetFlushJobInfo(std::unique_ptr<FlushJobInfo>&& info) {
flush_job_info_ = std::move(info); flush_job_info_ = std::move(info);
}; }
std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() { std::unique_ptr<FlushJobInfo> ReleaseFlushJobInfo() {
return std::move(flush_job_info_); return std::move(flush_job_info_);
} }
static void HandleTypeValue(
const Slice& lookup_user_key, const Slice& value, bool value_pinned,
bool do_merge, bool merge_in_progress, MergeContext* merge_context,
const MergeOperator* merge_operator, SystemClock* clock,
Statistics* statistics, Logger* info_log, Status* s,
std::string* out_value, PinnableWideColumns* out_columns,
bool* is_blob_index) {
*s = Status::OK();
if (!do_merge) {
// Preserve the value with the goal of returning it as part of
// raw merge operands to the user
// TODO(yanqin) update MergeContext so that timestamps information
// can also be retained.
merge_context->PushOperand(value, value_pinned);
} else if (merge_in_progress) {
assert(do_merge);
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
if (out_value || out_columns) {
*s = MergeHelper::TimedFullMerge(
merge_operator, lookup_user_key, MergeHelper::kPlainBaseValue,
value, merge_context->GetOperands(), info_log, statistics, clock,
/* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr, out_value, out_columns);
}
} else if (out_value) {
out_value->assign(value.data(), value.size());
} else if (out_columns) {
out_columns->SetPlainValue(value);
}
if (is_blob_index) {
*is_blob_index = false;
}
}
static void HandleTypeDeletion(
const Slice& lookup_user_key, bool merge_in_progress,
MergeContext* merge_context, const MergeOperator* merge_operator,
SystemClock* clock, Statistics* statistics, Logger* logger, Status* s,
std::string* out_value, PinnableWideColumns* out_columns) {
if (merge_in_progress) {
if (out_value || out_columns) {
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
*s = MergeHelper::TimedFullMerge(
merge_operator, lookup_user_key, MergeHelper::kNoBaseValue,
merge_context->GetOperands(), logger, statistics, clock,
/* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr, out_value, out_columns);
} else {
// We have found a final value (a base deletion) and have newer
// merge operands that we do not intend to merge. Nothing remains
// to be done so assign status to OK.
*s = Status::OK();
}
} else {
*s = Status::NotFound();
}
}
protected: protected:
friend class MemTableList; friend class MemTableList;
@ -603,10 +673,7 @@ class MemTable final : public ReadOnlyMemTable {
} }
} }
// Returns if there is no entry inserted to the mem table. bool IsEmpty() const override { return first_seqno_ == 0; }
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
bool IsEmpty() const { return first_seqno_ == 0; }
SequenceNumber GetFirstSequenceNumber() override { SequenceNumber GetFirstSequenceNumber() override {
return first_seqno_.load(std::memory_order_relaxed); return first_seqno_.load(std::memory_order_relaxed);

View file

@ -188,8 +188,10 @@ class WriteBatchWithIndex : public WriteBatchBase {
// Create an iterator of a column family. User can call iterator.Seek() to // Create an iterator of a column family. User can call iterator.Seek() to
// search to the next entry of or after a key. Keys will be iterated in the // search to the next entry of or after a key. Keys will be iterated in the
// order given by index_comparator. For multiple updates on the same key, // order given by index_comparator. For multiple updates on the same key,
// each update will be returned as a separate entry, in the order of update // if overwrite_key=false, then each update will be returned as a separate
// time. // entry, in the order of update time.
// if overwrite_key=true, then one entry per key will be returned. Merge
// updates on the same key will be returned as separate entries.
// //
// The returned iterator should be deleted by the caller. // The returned iterator should be deleted by the caller.
WBWIIterator* NewIterator(ColumnFamilyHandle* column_family); WBWIIterator* NewIterator(ColumnFamilyHandle* column_family);
@ -354,6 +356,10 @@ class WriteBatchWithIndex : public WriteBatchBase {
friend class WriteUnpreparedTxn; friend class WriteUnpreparedTxn;
friend class WriteBatchWithIndex_SubBatchCnt_Test; friend class WriteBatchWithIndex_SubBatchCnt_Test;
friend class WriteBatchWithIndexInternal; friend class WriteBatchWithIndexInternal;
friend class WBWIMemTable;
WBWIIterator* NewIterator(uint32_t cf_id) const;
// Returns the number of sub-batches inside the write batch. A sub-batch // Returns the number of sub-batches inside the write batch. A sub-batch
// starts right before inserting a key that is a duplicate of a key in the // starts right before inserting a key that is a duplicate of a key in the
// last sub-batch. // last sub-batch.

147
memtable/wbwi_memtable.cc Normal file
View file

@ -0,0 +1,147 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "memtable/wbwi_memtable.h"
#include "db/memtable.h"
namespace ROCKSDB_NAMESPACE {
const std::unordered_map<WriteType, ValueType>
WBWIMemTableIterator::WriteTypeToValueTypeMap = {
{kPutRecord, kTypeValue},
{kMergeRecord, kTypeMerge},
{kDeleteRecord, kTypeDeletion},
{kSingleDeleteRecord, kTypeSingleDeletion},
{kDeleteRangeRecord, kTypeRangeDeletion},
{kPutEntityRecord, kTypeWideColumnEntity},
// Only the above record types are added to WBWI.
// kLogDataRecord, kXIDRecord, kUnknownRecord
};
bool WBWIMemTable::Get(const LookupKey& key, std::string* value,
PinnableWideColumns* columns, std::string* timestamp,
Status* s, MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* out_seq, const ReadOptions&,
bool immutable_memtable, ReadCallback* callback,
bool* is_blob_index, bool do_merge) {
(void)immutable_memtable;
(void)timestamp;
(void)columns;
assert(immutable_memtable);
assert(!timestamp); // TODO: support UDT
assert(!columns); // TODO: support WideColumn
assert(global_seqno_ != kMaxSequenceNumber);
// WBWI does not support DeleteRange yet.
assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
[[maybe_unused]] SequenceNumber read_seq =
GetInternalKeySeqno(key.internal_key());
std::unique_ptr<InternalIterator> iter{NewIterator()};
iter->Seek(key.internal_key());
const Slice lookup_user_key = key.user_key();
while (iter->Valid() && comparator_->EqualWithoutTimestamp(
ExtractUserKey(iter->key()), lookup_user_key)) {
uint64_t tag = ExtractInternalKeyFooter(iter->key());
ValueType type;
SequenceNumber seq;
UnPackSequenceAndType(tag, &seq, &type);
// Unsupported operations.
assert(type != kTypeBlobIndex);
assert(type != kTypeWideColumnEntity);
assert(type != kTypeValuePreferredSeqno);
assert(type != kTypeDeletionWithTimestamp);
assert(type != kTypeMerge);
if (!callback || callback->IsVisible(seq)) {
if (*out_seq == kMaxSequenceNumber) {
*out_seq = std::max(seq, *max_covering_tombstone_seq);
}
if (*max_covering_tombstone_seq > seq) {
type = kTypeRangeDeletion;
}
switch (type) {
case kTypeValue: {
HandleTypeValue(lookup_user_key, iter->value(), iter->IsValuePinned(),
do_merge, s->IsMergeInProgress(), merge_context,
moptions_.merge_operator, clock_,
moptions_.statistics, moptions_.info_log, s, value,
columns, is_blob_index);
assert(seq <= read_seq);
return /*found_final_value=*/true;
}
case kTypeDeletion:
case kTypeSingleDeletion:
case kTypeRangeDeletion: {
HandleTypeDeletion(lookup_user_key, s->IsMergeInProgress(),
merge_context, moptions_.merge_operator, clock_,
moptions_.statistics, moptions_.info_log, s, value,
columns);
assert(seq <= read_seq);
return /*found_final_value=*/true;
}
default: {
std::string msg("Unrecognized or unsupported value type: " +
std::to_string(static_cast<int>(type)) + ". ");
msg.append("User key: " +
ExtractUserKey(iter->key()).ToString(/*hex=*/true) + ". ");
msg.append("seq: " + std::to_string(seq) + ".");
*s = Status::Corruption(msg.c_str());
return /*found_final_value=*/true;
}
}
}
// Current key not visible or we read a merge key
assert(s->IsMergeInProgress() || (callback && !callback->IsVisible(seq)));
iter->Next();
}
if (!iter->status().ok() &&
(s->ok() || s->IsMergeInProgress() || s->IsNotFound())) {
*s = iter->status();
// stop further look up
return true;
}
return /*found_final_value=*/false;
}
void WBWIMemTable::MultiGet(const ReadOptions& read_options,
MultiGetRange* range, ReadCallback* callback,
bool immutable_memtable) {
(void)immutable_memtable;
// Should only be used as immutable memtable.
assert(immutable_memtable);
// TODO: reuse the InternalIterator created in Get().
for (auto iter = range->begin(); iter != range->end(); ++iter) {
SequenceNumber dummy_seq;
bool found_final_value =
Get(*iter->lkey, iter->value ? iter->value->GetSelf() : nullptr,
iter->columns, iter->timestamp, iter->s, &(iter->merge_context),
&(iter->max_covering_tombstone_seq), &dummy_seq, read_options, true,
callback, nullptr, true);
if (found_final_value) {
if (iter->s->ok() || iter->s->IsNotFound()) {
if (iter->value) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
} else {
assert(iter->columns);
range->AddValueSize(iter->columns->serialized_size());
}
}
range->MarkKeyDone(iter);
if (range->GetValueSize() > read_options.value_size_soft_limit) {
// Set all remaining keys in range to Abort
for (auto range_iter = range->begin(); range_iter != range->end();
++range_iter) {
range->MarkKeyDone(range_iter);
*(range_iter->s) = Status::Aborted();
}
break;
}
}
}
}
} // namespace ROCKSDB_NAMESPACE

334
memtable/wbwi_memtable.h Normal file
View file

@ -0,0 +1,334 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "db/memtable.h"
#include "rocksdb/utilities/write_batch_with_index.h"
namespace ROCKSDB_NAMESPACE {
class WBWIMemTableIterator final : public InternalIterator {
public:
WBWIMemTableIterator(std::unique_ptr<WBWIIterator>&& it, SequenceNumber seqno,
const Comparator* comparator)
: it_(std::move(it)), global_seqno_(seqno), comparator_(comparator) {
assert(seqno != kMaxSequenceNumber);
s_.PermitUncheckedError();
}
// No copying allowed
WBWIMemTableIterator(const WBWIMemTableIterator&) = delete;
WBWIMemTableIterator& operator=(const WBWIMemTableIterator&) = delete;
bool Valid() const override { return valid_; }
void SeekToFirst() override {
it_->SeekToFirst();
UpdateKey();
}
void SeekToLast() override {
it_->SeekToLast();
UpdateKey();
}
void Seek(const Slice& target) override {
Slice target_user_key = ExtractUserKey(target);
it_->Seek(target_user_key);
if (it_->Valid()) {
// compare seqno
SequenceNumber seqno = GetInternalKeySeqno(target);
if (seqno < global_seqno_ &&
comparator_->Compare(it_->Entry().key, target_user_key) == 0) {
it_->Next();
// TODO: cannot assume distinct keys once Merge is supported
if (it_->Valid()) {
assert(comparator_->Compare(it_->Entry().key, target_user_key) > 0);
}
}
}
UpdateKey();
}
void SeekForPrev(const Slice& target) override {
Slice target_user_key = ExtractUserKey(target);
it_->SeekForPrev(target_user_key);
if (it_->Valid()) {
SequenceNumber seqno = GetInternalKeySeqno(target);
if (seqno > global_seqno_ &&
comparator_->Compare(it_->Entry().key, target_user_key) == 0) {
it_->Prev();
if (it_->Valid()) {
// TODO: cannot assume distinct keys once Merge is supported
assert(comparator_->Compare(it_->Entry().key, target_user_key) < 0);
}
}
}
UpdateKey();
}
void Next() override {
assert(Valid());
it_->Next();
UpdateKey();
}
bool NextAndGetResult(IterateResult* result) override {
assert(Valid());
Next();
bool is_valid = Valid();
if (is_valid) {
result->key = key();
result->bound_check_result = IterBoundCheck::kUnknown;
result->value_prepared = true;
}
return is_valid;
}
void Prev() override {
assert(Valid());
it_->Prev();
UpdateKey();
}
Slice key() const override {
assert(Valid());
return key_;
}
Slice value() const override {
assert(Valid());
return it_->Entry().value;
}
Status status() const override {
assert(it_->status().ok());
return s_;
}
private:
static const std::unordered_map<WriteType, ValueType> WriteTypeToValueTypeMap;
void UpdateKey() {
valid_ = it_->Valid();
if (!Valid()) {
key_.clear();
return;
}
auto t = WriteTypeToValueTypeMap.find(it_->Entry().type);
assert(t != WriteTypeToValueTypeMap.end());
if (t == WriteTypeToValueTypeMap.end()) {
key_.clear();
valid_ = false;
s_ = Status::Corruption("Unexpected write_batch_with_index entry type " +
std::to_string(t->second));
return;
}
key_buf_.SetInternalKey(it_->Entry().key, global_seqno_, t->second);
key_ = key_buf_.GetInternalKey();
}
std::unique_ptr<WBWIIterator> it_;
// The sequence number of entries in this write batch.
SequenceNumber global_seqno_;
const Comparator* comparator_;
IterKey key_buf_;
// The current internal key.
Slice key_;
Status s_;
bool valid_ = false;
};
class WBWIMemTable final : public ReadOnlyMemTable {
public:
WBWIMemTable(const std::shared_ptr<WriteBatchWithIndex>& wbwi,
const Comparator* cmp, uint32_t cf_id,
const ImmutableOptions* immutable_options,
const MutableCFOptions* cf_options)
: wbwi_(wbwi),
comparator_(cmp),
ikey_comparator_(comparator_),
moptions_(*immutable_options, *cf_options),
clock_(immutable_options->clock),
cf_id_(cf_id) {}
// No copying allowed
WBWIMemTable(const WBWIMemTable&) = delete;
WBWIMemTable& operator=(const WBWIMemTable&) = delete;
~WBWIMemTable() override = default;
const char* Name() const override { return "WBWIMemTable"; }
size_t ApproximateMemoryUsage() override {
// FIXME: we can calculate for each CF or just divide evenly among CFs
// Used in ReportFlushInputSize(), MemPurgeDecider, flush job event logging,
// and InternalStats::HandleCurSizeAllMemTables
return 0;
}
size_t MemoryAllocatedBytes() const override {
// FIXME: similar to ApproximateMemoryUsage().
// Used in MemTableList to trim memtable history.
return 0;
}
void UniqueRandomSample(
const uint64_t& /* target_sample_size */,
std::unordered_set<const char*>* /* entries */) override {
// TODO: support mempurge
assert(false);
}
InternalIterator* NewIterator(
const ReadOptions&, UnownedPtr<const SeqnoToTimeMapping>, Arena* arena,
const SliceTransform* /* prefix_extractor */) override {
// Ingested WBWIMemTable should have an assigned seqno
assert(global_seqno_ != kMaxSequenceNumber);
assert(arena);
auto mem = arena->AllocateAligned(sizeof(WBWIMemTableIterator));
return new (mem) WBWIMemTableIterator(
std::unique_ptr<WBWIIterator>(wbwi_->NewIterator(cf_id_)),
global_seqno_, comparator_);
}
// Returns an iterator that wraps a MemTableIterator and logically strips the
// user-defined timestamp of each key. This API is only used by flush when
// user-defined timestamps in MemTable only feature is enabled.
InternalIterator* NewTimestampStrippingIterator(
const ReadOptions&, UnownedPtr<const SeqnoToTimeMapping>, Arena* arena,
const SliceTransform*, size_t) override {
// TODO: support UDT
assert(false);
return NewErrorInternalIterator(
Status::NotSupported(
"WBWIMemTable does not support NewTimestampStrippingIterator."),
arena);
}
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
const ReadOptions&, SequenceNumber, bool) override {
// TODO: support DeleteRange
assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
return nullptr;
}
FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator(
const ReadOptions&, SequenceNumber, size_t) override {
// TODO: support UDT
assert(false);
return nullptr;
}
// FIXME: not a good practice to use default parameter with virtual function
using ReadOnlyMemTable::Get;
bool Get(const LookupKey& key, std::string* value,
PinnableWideColumns* columns, std::string* timestamp, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
const ReadOptions& read_opts, bool immutable_memtable,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
bool do_merge = true) override;
void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
ReadCallback* callback, bool immutable_memtable) override;
uint64_t NumEntries() const override {
// FIXME: used in
// - verify number of entries processed during flush
// - stats for estimate num entries and num entries in immutable memtables
// - MemPurgeDecider
return 0;
}
uint64_t NumDeletion() const override {
// FIXME: this is used for stats and event logging
return 0;
}
uint64_t NumRangeDeletion() const override {
// FIXME
assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
return 0;
}
uint64_t GetDataSize() const override {
// FIXME: used in event logging in flush_job
return 0;
}
SequenceNumber GetEarliestSequenceNumber() override { return global_seqno_; }
bool IsEmpty() const override {
// Ideally also check that wbwi contains updates from this CF. For now, we
// only create WBWIMemTable for CFs with updates in wbwi.
return wbwi_->GetWriteBatch()->Count() == 0;
}
SequenceNumber GetFirstSequenceNumber() override { return global_seqno_; }
uint64_t GetMinLogContainingPrepSection() override {
// FIXME: used to retain WAL with pending Prepare
return min_prep_log_referenced_;
}
void MarkImmutable() override {}
void MarkFlushed() override {}
MemTableStats ApproximateStats(const Slice&, const Slice&) override {
// FIXME: used for query planning
return {};
}
const InternalKeyComparator& GetInternalKeyComparator() const override {
return ikey_comparator_;
}
uint64_t ApproximateOldestKeyTime() const override {
// FIXME: can use the time when this is added to the DB.
return kUnknownOldestAncesterTime;
}
bool IsFragmentedRangeTombstonesConstructed() const override {
assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
return true;
}
const Slice& GetNewestUDT() const override {
// FIXME: support UDT
assert(false);
return newest_udt_;
}
// Assign a sequence number to the entries in this memtable.
void SetGlobalSequenceNumber(SequenceNumber global_seqno) {
// Not expecting to assign seqno multiple times.
assert(global_seqno_ == kMaxSequenceNumber);
global_seqno_ = global_seqno;
}
private:
InternalIterator* NewIterator() const {
assert(global_seqno_ != kMaxSequenceNumber);
return new WBWIMemTableIterator(
std::unique_ptr<WBWIIterator>(wbwi_->NewIterator(cf_id_)),
global_seqno_, comparator_);
}
Slice newest_udt_;
std::shared_ptr<WriteBatchWithIndex> wbwi_;
const Comparator* comparator_;
InternalKeyComparator ikey_comparator_;
SequenceNumber global_seqno_ = kMaxSequenceNumber;
const ImmutableMemTableOptions moptions_;
SystemClock* clock_;
uint64_t min_prep_log_referenced_{0};
// WBWI can contains updates to multiple CFs. `cf_id_` determines which CF
// this memtable is for.
uint32_t cf_id_;
};
} // namespace ROCKSDB_NAMESPACE

1
src.mk
View file

@ -141,6 +141,7 @@ LIB_SOURCES = \
memtable/hash_skiplist_rep.cc \ memtable/hash_skiplist_rep.cc \
memtable/skiplistrep.cc \ memtable/skiplistrep.cc \
memtable/vectorrep.cc \ memtable/vectorrep.cc \
memtable/wbwi_memtable.cc \
memtable/write_buffer_manager.cc \ memtable/write_buffer_manager.cc \
monitoring/histogram.cc \ monitoring/histogram.cc \
monitoring/histogram_windowing.cc \ monitoring/histogram_windowing.cc \

View file

@ -313,6 +313,11 @@ WBWIIterator* WriteBatchWithIndex::NewIterator(
&(rep->comparator)); &(rep->comparator));
} }
WBWIIterator* WriteBatchWithIndex::NewIterator(uint32_t cf_id) const {
return new WBWIIteratorImpl(cf_id, &(rep->skip_list), &rep->write_batch,
&(rep->comparator));
}
Iterator* WriteBatchWithIndex::NewIteratorWithBase( Iterator* WriteBatchWithIndex::NewIteratorWithBase(
ColumnFamilyHandle* column_family, Iterator* base_iterator, ColumnFamilyHandle* column_family, Iterator* base_iterator,
const ReadOptions* read_options) { const ReadOptions* read_options) {

View file

@ -9,11 +9,14 @@
#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/write_batch_with_index.h"
#include <db/db_test_util.h>
#include <map> #include <map>
#include <memory> #include <memory>
#include "db/column_family.h" #include "db/column_family.h"
#include "db/wide/wide_columns_helper.h" #include "db/wide/wide_columns_helper.h"
#include "memtable/wbwi_memtable.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
@ -3415,6 +3418,281 @@ TEST_P(WriteBatchWithIndexTest, EntityReadSanityChecks) {
} }
INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool()); INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool());
std::string Get(const std::string& k, std::unique_ptr<WBWIMemTable>& wbwi_mem,
SequenceNumber snapshot_seq, bool* found_final_value) {
LookupKey lkey(k, snapshot_seq);
std::string val;
SequenceNumber max_range_del_seqno = 0;
SequenceNumber out_seqno = 0;
bool is_blob_index = false;
Status s;
*found_final_value = wbwi_mem->Get(
lkey, &val, nullptr, nullptr, &s, nullptr, &max_range_del_seqno,
&out_seqno, ReadOptions(), true, nullptr, &is_blob_index, true);
if (s.ok()) {
if (*found_final_value) {
EXPECT_FALSE(val.empty());
return val;
}
return "NOT_FOUND";
}
EXPECT_TRUE(s.IsNotFound());
EXPECT_TRUE(*found_final_value);
return "NOT_FOUND";
}
class WBWIMemTableTest : public testing::Test {};
TEST_F(WBWIMemTableTest, ReadFromWBWIMemtable) {
// Mini stress test for read.
// Do random 10000 put and delete operations then do some overwrite.
// Keep track of expected state, then verify with Get, MultiGet, and Iterator.
const Comparator* cmp = BytewiseComparator();
Options opts;
ImmutableOptions immutable_opts(opts);
MutableCFOptions mutable_cf_options(opts);
Random rnd(301);
auto wbwi = std::make_shared<WriteBatchWithIndex>(cmp, 0, true, 0, 0);
std::vector<std::pair<std::string, std::string>> expected;
expected.resize(10000);
for (int i = 0; i < 10000; ++i) {
// Leave a non-existing key 9999 in between existing keys to test read.
std::string key = i < 9999 ? DBTestBase::Key(i) : DBTestBase::Key(i + 1);
bool del = rnd.OneIn(2);
std::string val = del ? "NOT_FOUND" : rnd.RandomString(50);
expected[i] = std::make_pair(key, val);
}
// Random insertion order
RandomShuffle(expected.begin(), expected.end());
std::unique_ptr<WBWIMemTable> wbwi_mem{
new WBWIMemTable(wbwi, cmp,
/*cf_id=*/0, &immutable_opts, &mutable_cf_options)};
ASSERT_TRUE(wbwi_mem->IsEmpty());
constexpr SequenceNumber visible_seq = 3;
constexpr SequenceNumber non_visible_seq = 1;
constexpr SequenceNumber assigned_seq = 2;
wbwi_mem->SetGlobalSequenceNumber(assigned_seq);
bool found_final_value = false;
for (const auto& [key, val] : expected) {
if (val == "NOT_FOUND") {
if (rnd.OneIn(2)) {
ASSERT_OK(wbwi->SingleDelete(key));
} else {
ASSERT_OK(wbwi->Delete(key));
}
} else {
ASSERT_OK(wbwi->Put(key, val));
}
found_final_value = false;
// We are writing to wbwi after WBWIMemtable is created. This won't
// happen with normal usage, but we just use the hack for testing here.
ASSERT_TRUE(val == Get(key, wbwi_mem, visible_seq, &found_final_value));
ASSERT_TRUE(found_final_value);
}
ASSERT_FALSE(wbwi_mem->IsEmpty());
// Some data with same key in another CF
ColumnFamilyHandleImplDummy meta_cf(/*id=*/1, BytewiseComparator());
ASSERT_OK(wbwi->Put(&meta_cf, DBTestBase::Key(0), "foo"));
RandomShuffle(expected.begin(), expected.end());
// overwrites
for (size_t i = 0; i < 2000; ++i) {
// We don't expect mixing SD and DEL, or issue multiple SD consecutively in
// a DB. Read from WBWI should still work so we do it here to keep the test
// simple.
if (rnd.OneIn(2)) {
std::string val = rnd.RandomString(100);
expected[i].second = val;
ASSERT_OK(wbwi->Put(expected[i].first, val));
} else {
expected[i].second = "NOT_FOUND";
if (rnd.OneIn(2)) {
ASSERT_OK(wbwi->SingleDelete(expected[i].first));
} else {
ASSERT_OK(wbwi->Delete(expected[i].first));
}
}
found_final_value = false;
ASSERT_TRUE(expected[i].second == Get(expected[i].first, wbwi_mem,
visible_seq, &found_final_value));
ASSERT_TRUE(found_final_value);
}
// Get a non-existing key
found_final_value = false;
ASSERT_EQ("NOT_FOUND", Get("foo", wbwi_mem, visible_seq, &found_final_value));
ASSERT_FALSE(found_final_value);
ASSERT_EQ("NOT_FOUND", Get(DBTestBase::Key(9999), wbwi_mem, visible_seq,
&found_final_value));
ASSERT_FALSE(found_final_value);
// Get with a non-visible snapshot
found_final_value = false;
ASSERT_EQ("NOT_FOUND", Get(DBTestBase::Key(0), wbwi_mem, non_visible_seq,
&found_final_value));
ASSERT_FALSE(found_final_value);
// Get existing keys
RandomShuffle(expected.begin(), expected.end());
for (const auto& [key, val] : expected) {
found_final_value = false;
ASSERT_TRUE(val == Get(key, wbwi_mem, visible_seq, &found_final_value));
ASSERT_TRUE(found_final_value);
}
// MultiGet
int batch_size = 30;
for (int i = 0; i < 10000; i += batch_size) {
for (uint64_t read_seq : {non_visible_seq, visible_seq}) {
autovector<KeyContext> key_context;
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE> sorted_keys;
sorted_keys.resize(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
std::vector<Slice> key_slice(batch_size);
std::vector<std::string> key_str(batch_size);
for (int j = 0; j < batch_size; ++j) {
if (i + j >= 10000) {
// read non-existing keys
// the last key in expected is 10000
key_str[i + j - 10000] = DBTestBase::Key(i + j + 1);
key_slice[j] = key_str[i + j - 10000];
} else {
key_slice[j] = expected[i + j].first;
}
key_context.emplace_back(/*col_family=*/nullptr, key_slice[j],
&values[j], /*cols=*/nullptr, /*ts=*/nullptr,
&statuses[j]);
}
for (int j = 0; j < batch_size; ++j) {
sorted_keys[j] = &key_context[j];
}
std::sort(sorted_keys.begin(), sorted_keys.begin() + batch_size,
[](const KeyContext* a, const KeyContext* b) {
return a->key->compare(*b->key) < 0;
});
MultiGetContext ctx(&sorted_keys, 0, batch_size, read_seq, ReadOptions(),
immutable_opts.fs.get(),
immutable_opts.statistics.get());
MultiGetRange range = ctx.GetMultiGetRange();
wbwi_mem->MultiGet(ReadOptions(), &range, /*callback=*/nullptr,
/*immutable_memtable=*/true);
for (int j = 0; j < batch_size; ++j) {
if (read_seq != visible_seq || i + j >= 10000) {
// Nothing is found in WBWIMemtable, status and value are not set.
ASSERT_OK(statuses[j]);
ASSERT_TRUE(values[j].empty());
} else if (expected[i + j].second == "NOT_FOUND") {
ASSERT_TRUE(statuses[j].IsNotFound());
} else {
ASSERT_OK(statuses[j]);
ASSERT_EQ(values[j], expected[i + j].second);
}
}
}
}
// Sort keys to compare with iterator
std::sort(expected.begin(), expected.end(),
[](const std::pair<std::string, std::string>& a,
const std::pair<std::string, std::string>& b) {
return a.first < b.first;
});
Arena arena;
InternalIterator* iter =
wbwi_mem->NewIterator(ReadOptions(), /*seqno_to_time_mapping=*/nullptr,
&arena, /*prefix_extractor=*/nullptr);
ASSERT_OK(iter->status());
auto verify_iter_at = [&](size_t idx) {
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(ExtractUserKey(iter->key()), expected[idx].first);
SequenceNumber seq;
ValueType val_type;
UnPackSequenceAndType(ExtractInternalKeyFooter(iter->key()), &seq,
&val_type);
ASSERT_EQ(seq, assigned_seq);
if (expected[idx].second == "NOT_FOUND") {
ASSERT_TRUE(val_type == kTypeDeletion || val_type == kTypeSingleDeletion);
} else {
ASSERT_EQ(val_type, kTypeValue);
ASSERT_EQ(iter->value(), expected[idx].second);
}
};
// Seek then next, prev
IterKey seek_key;
for (int i = 0; i < 1000; i++) {
uint32_t key_idx = rnd.Uniform(10000);
for (bool seek_for_prev : {false, true}) {
if (seek_for_prev) {
seek_key.SetInternalKey(expected[key_idx].first, 0,
kValueTypeForSeekForPrev);
iter->SeekForPrev(seek_key.GetInternalKey());
} else {
seek_key.SetInternalKey(expected[key_idx].first, visible_seq,
kValueTypeForSeek);
iter->Seek(seek_key.GetInternalKey());
}
verify_iter_at(key_idx);
uint32_t j = key_idx + 1;
for (; j < std::min(10000u, key_idx + 5); ++j) {
iter->Next();
verify_iter_at(j);
}
for (j -= 2; j >= key_idx; --j) {
iter->Prev();
verify_iter_at(j);
}
}
}
iter->SeekToFirst();
for (size_t i = 0; i < expected.size(); ++i) {
verify_iter_at(i);
iter->Next();
}
ASSERT_OK(iter->status());
ASSERT_FALSE(iter->Valid());
iter->SeekToLast();
for (int i = static_cast<int>(expected.size() - 1); i >= 0; --i) {
verify_iter_at(i);
iter->Prev();
}
ASSERT_OK(iter->status());
ASSERT_FALSE(iter->Valid());
// Read from another CF
std::unique_ptr<WBWIMemTable> meta_wbwi_mem{new WBWIMemTable(
wbwi, cmp, /*cf_id=*/1, &immutable_opts, &mutable_cf_options)};
meta_wbwi_mem->SetGlobalSequenceNumber(assigned_seq);
found_final_value = false;
ASSERT_TRUE("foo" == Get(DBTestBase::Key(0), meta_wbwi_mem, visible_seq,
&found_final_value));
ASSERT_TRUE(found_final_value);
found_final_value = false;
ASSERT_TRUE("NOT_FOUND" == Get(DBTestBase::Key(1), meta_wbwi_mem, visible_seq,
&found_final_value));
ASSERT_FALSE(found_final_value);
// Deleting one memtable should not affect another memtable with the same wbwi
wbwi_mem.reset();
// allocated by arena
iter->~InternalIterator();
iter = meta_wbwi_mem->NewIterator(ReadOptions(), nullptr, &arena, nullptr);
iter->SeekToFirst();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(ExtractUserKey(iter->key()), DBTestBase::Key(0));
ASSERT_EQ(iter->value(), "foo");
iter->Next();
ASSERT_OK(iter->status());
ASSERT_FALSE(iter->Valid());
iter->~InternalIterator();
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {