From 1c7652fcef3d75ea199a73c5d7997d4933e9de84 Mon Sep 17 00:00:00 2001 From: Changyu Bi Date: Tue, 12 Nov 2024 09:27:11 -0800 Subject: [PATCH] Introduce a WriteBatchWithIndex-based implementation of ReadOnlyMemTable (#13123) Summary: introduce the class WBWIMemTable that implements ReadOnlyMemTable interface with data stored in a WriteBatchWithIndex object. This PR implements the main read path: Get, MultiGet and Iterator. It only supports Put, Delete and SingleDelete operations for now. All the keys in the WBWIMemTable will be assigned a global sequence number through WBWIMemTable::SetGlobalSequenceNumber(). Planned follow up PRs: - Create WBWIMemTable with a transaction's WBWI and ingest it into a DB during Transaction::Commit() - Support for Merge. This will be more complicated since we can have multiple updates with the same user key for Merge. - Support for other operations like WideColumn and other ReadOnlyMemTable methods. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13123 Test Plan: * A mini-stress test for the read path is added as a new unit test Reviewed By: jowlyzhang Differential Revision: D65633419 Pulled By: cbi42 fbshipit-source-id: 0684fe47260b41f51ca39c300eb72ca5bc9c5a3b --- CMakeLists.txt | 1 + TARGETS | 1 + db/external_sst_file_ingestion_job.cc | 2 +- db/memtable.cc | 64 +--- db/memtable.h | 77 +++- .../utilities/write_batch_with_index.h | 10 +- memtable/wbwi_memtable.cc | 147 ++++++++ memtable/wbwi_memtable.h | 334 ++++++++++++++++++ src.mk | 1 + .../write_batch_with_index.cc | 5 + .../write_batch_with_index_test.cc | 278 +++++++++++++++ 11 files changed, 857 insertions(+), 63 deletions(-) create mode 100644 memtable/wbwi_memtable.cc create mode 100644 memtable/wbwi_memtable.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 17dced38ca..a5905d3bd1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -779,6 +779,7 @@ set(SOURCES memtable/hash_skiplist_rep.cc memtable/skiplistrep.cc memtable/vectorrep.cc + memtable/wbwi_memtable.cc memtable/write_buffer_manager.cc monitoring/histogram.cc monitoring/histogram_windowing.cc diff --git a/TARGETS b/TARGETS index f09e5fffd1..8592ffcb18 100644 --- a/TARGETS +++ b/TARGETS @@ -149,6 +149,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "memtable/hash_skiplist_rep.cc", "memtable/skiplistrep.cc", "memtable/vectorrep.cc", + "memtable/wbwi_memtable.cc", "memtable/write_buffer_manager.cc", "monitoring/histogram.cc", "monitoring/histogram_windowing.cc", diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 61588b2453..c9c823e4fb 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -400,7 +400,7 @@ Status ExternalSstFileIngestionJob::Run() { // specific state of Memtables. The mutable Memtable should be empty, and the // immutable Memtable list should be empty. if (flushed_before_run_ && (super_version->imm->NumNotFlushed() != 0 || - super_version->mem->GetDataSize() != 0)) { + !super_version->mem->IsEmpty())) { return Status::TryAgain( "Inconsistent memtable state detected when flushed before run."); } diff --git a/db/memtable.cc b/db/memtable.cc index ef8ac07c78..1d3ee7eef4 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -1251,47 +1251,16 @@ static bool SaveValue(void* arg, const char* entry) { case kTypeValue: case kTypeValuePreferredSeqno: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); - if (type == kTypeValuePreferredSeqno) { v = ParsePackedValueForValue(v); } - *(s->status) = Status::OK(); - - if (!s->do_merge) { - // Preserve the value with the goal of returning it as part of - // raw merge operands to the user - // TODO(yanqin) update MergeContext so that timestamps information - // can also be retained. - - merge_context->PushOperand( - v, s->inplace_update_support == false /* operand_pinned */); - } else if (*(s->merge_in_progress)) { - assert(s->do_merge); - - if (s->value || s->columns) { - // `op_failure_scope` (an output parameter) is not provided (set to - // nullptr) since a failure must be propagated regardless of its - // value. - *(s->status) = MergeHelper::TimedFullMerge( - merge_operator, s->key->user_key(), - MergeHelper::kPlainBaseValue, v, merge_context->GetOperands(), - s->logger, s->statistics, s->clock, - /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr, - s->value, s->columns); - } - } else if (s->value) { - s->value->assign(v.data(), v.size()); - } else if (s->columns) { - s->columns->SetPlainValue(v); - } - + ReadOnlyMemTable::HandleTypeValue( + s->key->user_key(), v, s->inplace_update_support == false, + s->do_merge, *(s->merge_in_progress), merge_context, + s->merge_operator, s->clock, s->statistics, s->logger, s->status, + s->value, s->columns, s->is_blob_index); *(s->found_final_value) = true; - - if (s->is_blob_index != nullptr) { - *(s->is_blob_index) = false; - } - return false; } case kTypeWideColumnEntity: { @@ -1348,25 +1317,10 @@ static bool SaveValue(void* arg, const char* entry) { case kTypeDeletionWithTimestamp: case kTypeSingleDeletion: case kTypeRangeDeletion: { - if (*(s->merge_in_progress)) { - if (s->value || s->columns) { - // `op_failure_scope` (an output parameter) is not provided (set to - // nullptr) since a failure must be propagated regardless of its - // value. - *(s->status) = MergeHelper::TimedFullMerge( - merge_operator, s->key->user_key(), MergeHelper::kNoBaseValue, - merge_context->GetOperands(), s->logger, s->statistics, - s->clock, /* update_num_ops_stats */ true, - /* op_failure_scope */ nullptr, s->value, s->columns); - } else { - // We have found a final value (a base deletion) and have newer - // merge operands that we do not intend to merge. Nothing remains - // to be done so assign status to OK. - *(s->status) = Status::OK(); - } - } else { - *(s->status) = Status::NotFound(); - } + ReadOnlyMemTable::HandleTypeDeletion( + s->key->user_key(), *(s->merge_in_progress), s->merge_context, + s->merge_operator, s->clock, s->statistics, s->logger, s->status, + s->value, s->columns); *(s->found_final_value) = true; return false; } diff --git a/db/memtable.h b/db/memtable.h index ae86831176..897a3d079d 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -18,6 +18,7 @@ #include "db/dbformat.h" #include "db/kv_checksum.h" +#include "db/merge_helper.h" #include "db/range_tombstone_fragmenter.h" #include "db/read_callback.h" #include "db/seqno_to_time_mapping.h" @@ -266,6 +267,11 @@ class ReadOnlyMemTable { // operations on the same MemTable (unless this Memtable is immutable). virtual SequenceNumber GetFirstSequenceNumber() = 0; + // Returns if there is no entry inserted to the mem table. + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + virtual bool IsEmpty() const = 0; + // Returns the sequence number that is guaranteed to be smaller than or equal // to the sequence number of any key that could be inserted into this // memtable. It can then be assumed that any write with a larger(or equal) @@ -366,12 +372,76 @@ class ReadOnlyMemTable { void SetFlushJobInfo(std::unique_ptr&& info) { flush_job_info_ = std::move(info); - }; + } std::unique_ptr ReleaseFlushJobInfo() { return std::move(flush_job_info_); } + static void HandleTypeValue( + const Slice& lookup_user_key, const Slice& value, bool value_pinned, + bool do_merge, bool merge_in_progress, MergeContext* merge_context, + const MergeOperator* merge_operator, SystemClock* clock, + Statistics* statistics, Logger* info_log, Status* s, + std::string* out_value, PinnableWideColumns* out_columns, + bool* is_blob_index) { + *s = Status::OK(); + + if (!do_merge) { + // Preserve the value with the goal of returning it as part of + // raw merge operands to the user + // TODO(yanqin) update MergeContext so that timestamps information + // can also be retained. + merge_context->PushOperand(value, value_pinned); + } else if (merge_in_progress) { + assert(do_merge); + // `op_failure_scope` (an output parameter) is not provided (set to + // nullptr) since a failure must be propagated regardless of its + // value. + if (out_value || out_columns) { + *s = MergeHelper::TimedFullMerge( + merge_operator, lookup_user_key, MergeHelper::kPlainBaseValue, + value, merge_context->GetOperands(), info_log, statistics, clock, + /* update_num_ops_stats */ true, + /* op_failure_scope */ nullptr, out_value, out_columns); + } + } else if (out_value) { + out_value->assign(value.data(), value.size()); + } else if (out_columns) { + out_columns->SetPlainValue(value); + } + + if (is_blob_index) { + *is_blob_index = false; + } + } + + static void HandleTypeDeletion( + const Slice& lookup_user_key, bool merge_in_progress, + MergeContext* merge_context, const MergeOperator* merge_operator, + SystemClock* clock, Statistics* statistics, Logger* logger, Status* s, + std::string* out_value, PinnableWideColumns* out_columns) { + if (merge_in_progress) { + if (out_value || out_columns) { + // `op_failure_scope` (an output parameter) is not provided (set to + // nullptr) since a failure must be propagated regardless of its + // value. + *s = MergeHelper::TimedFullMerge( + merge_operator, lookup_user_key, MergeHelper::kNoBaseValue, + merge_context->GetOperands(), logger, statistics, clock, + /* update_num_ops_stats */ true, + /* op_failure_scope */ nullptr, out_value, out_columns); + } else { + // We have found a final value (a base deletion) and have newer + // merge operands that we do not intend to merge. Nothing remains + // to be done so assign status to OK. + *s = Status::OK(); + } + } else { + *s = Status::NotFound(); + } + } + protected: friend class MemTableList; @@ -603,10 +673,7 @@ class MemTable final : public ReadOnlyMemTable { } } - // Returns if there is no entry inserted to the mem table. - // REQUIRES: external synchronization to prevent simultaneous - // operations on the same MemTable (unless this Memtable is immutable). - bool IsEmpty() const { return first_seqno_ == 0; } + bool IsEmpty() const override { return first_seqno_ == 0; } SequenceNumber GetFirstSequenceNumber() override { return first_seqno_.load(std::memory_order_relaxed); diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index ad66236478..59bb4228df 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -188,8 +188,10 @@ class WriteBatchWithIndex : public WriteBatchBase { // Create an iterator of a column family. User can call iterator.Seek() to // search to the next entry of or after a key. Keys will be iterated in the // order given by index_comparator. For multiple updates on the same key, - // each update will be returned as a separate entry, in the order of update - // time. + // if overwrite_key=false, then each update will be returned as a separate + // entry, in the order of update time. + // if overwrite_key=true, then one entry per key will be returned. Merge + // updates on the same key will be returned as separate entries. // // The returned iterator should be deleted by the caller. WBWIIterator* NewIterator(ColumnFamilyHandle* column_family); @@ -354,6 +356,10 @@ class WriteBatchWithIndex : public WriteBatchBase { friend class WriteUnpreparedTxn; friend class WriteBatchWithIndex_SubBatchCnt_Test; friend class WriteBatchWithIndexInternal; + friend class WBWIMemTable; + + WBWIIterator* NewIterator(uint32_t cf_id) const; + // Returns the number of sub-batches inside the write batch. A sub-batch // starts right before inserting a key that is a duplicate of a key in the // last sub-batch. diff --git a/memtable/wbwi_memtable.cc b/memtable/wbwi_memtable.cc new file mode 100644 index 0000000000..9774842119 --- /dev/null +++ b/memtable/wbwi_memtable.cc @@ -0,0 +1,147 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "memtable/wbwi_memtable.h" + +#include "db/memtable.h" + +namespace ROCKSDB_NAMESPACE { + +const std::unordered_map + WBWIMemTableIterator::WriteTypeToValueTypeMap = { + {kPutRecord, kTypeValue}, + {kMergeRecord, kTypeMerge}, + {kDeleteRecord, kTypeDeletion}, + {kSingleDeleteRecord, kTypeSingleDeletion}, + {kDeleteRangeRecord, kTypeRangeDeletion}, + {kPutEntityRecord, kTypeWideColumnEntity}, + // Only the above record types are added to WBWI. + // kLogDataRecord, kXIDRecord, kUnknownRecord +}; + +bool WBWIMemTable::Get(const LookupKey& key, std::string* value, + PinnableWideColumns* columns, std::string* timestamp, + Status* s, MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, + SequenceNumber* out_seq, const ReadOptions&, + bool immutable_memtable, ReadCallback* callback, + bool* is_blob_index, bool do_merge) { + (void)immutable_memtable; + (void)timestamp; + (void)columns; + assert(immutable_memtable); + assert(!timestamp); // TODO: support UDT + assert(!columns); // TODO: support WideColumn + assert(global_seqno_ != kMaxSequenceNumber); + // WBWI does not support DeleteRange yet. + assert(!wbwi_->GetWriteBatch()->HasDeleteRange()); + + [[maybe_unused]] SequenceNumber read_seq = + GetInternalKeySeqno(key.internal_key()); + std::unique_ptr iter{NewIterator()}; + iter->Seek(key.internal_key()); + const Slice lookup_user_key = key.user_key(); + + while (iter->Valid() && comparator_->EqualWithoutTimestamp( + ExtractUserKey(iter->key()), lookup_user_key)) { + uint64_t tag = ExtractInternalKeyFooter(iter->key()); + ValueType type; + SequenceNumber seq; + UnPackSequenceAndType(tag, &seq, &type); + // Unsupported operations. + assert(type != kTypeBlobIndex); + assert(type != kTypeWideColumnEntity); + assert(type != kTypeValuePreferredSeqno); + assert(type != kTypeDeletionWithTimestamp); + assert(type != kTypeMerge); + if (!callback || callback->IsVisible(seq)) { + if (*out_seq == kMaxSequenceNumber) { + *out_seq = std::max(seq, *max_covering_tombstone_seq); + } + if (*max_covering_tombstone_seq > seq) { + type = kTypeRangeDeletion; + } + switch (type) { + case kTypeValue: { + HandleTypeValue(lookup_user_key, iter->value(), iter->IsValuePinned(), + do_merge, s->IsMergeInProgress(), merge_context, + moptions_.merge_operator, clock_, + moptions_.statistics, moptions_.info_log, s, value, + columns, is_blob_index); + assert(seq <= read_seq); + return /*found_final_value=*/true; + } + case kTypeDeletion: + case kTypeSingleDeletion: + case kTypeRangeDeletion: { + HandleTypeDeletion(lookup_user_key, s->IsMergeInProgress(), + merge_context, moptions_.merge_operator, clock_, + moptions_.statistics, moptions_.info_log, s, value, + columns); + assert(seq <= read_seq); + return /*found_final_value=*/true; + } + default: { + std::string msg("Unrecognized or unsupported value type: " + + std::to_string(static_cast(type)) + ". "); + msg.append("User key: " + + ExtractUserKey(iter->key()).ToString(/*hex=*/true) + ". "); + msg.append("seq: " + std::to_string(seq) + "."); + *s = Status::Corruption(msg.c_str()); + return /*found_final_value=*/true; + } + } + } + // Current key not visible or we read a merge key + assert(s->IsMergeInProgress() || (callback && !callback->IsVisible(seq))); + iter->Next(); + } + if (!iter->status().ok() && + (s->ok() || s->IsMergeInProgress() || s->IsNotFound())) { + *s = iter->status(); + // stop further look up + return true; + } + return /*found_final_value=*/false; +} + +void WBWIMemTable::MultiGet(const ReadOptions& read_options, + MultiGetRange* range, ReadCallback* callback, + bool immutable_memtable) { + (void)immutable_memtable; + // Should only be used as immutable memtable. + assert(immutable_memtable); + // TODO: reuse the InternalIterator created in Get(). + for (auto iter = range->begin(); iter != range->end(); ++iter) { + SequenceNumber dummy_seq; + bool found_final_value = + Get(*iter->lkey, iter->value ? iter->value->GetSelf() : nullptr, + iter->columns, iter->timestamp, iter->s, &(iter->merge_context), + &(iter->max_covering_tombstone_seq), &dummy_seq, read_options, true, + callback, nullptr, true); + if (found_final_value) { + if (iter->s->ok() || iter->s->IsNotFound()) { + if (iter->value) { + iter->value->PinSelf(); + range->AddValueSize(iter->value->size()); + } else { + assert(iter->columns); + range->AddValueSize(iter->columns->serialized_size()); + } + } + range->MarkKeyDone(iter); + if (range->GetValueSize() > read_options.value_size_soft_limit) { + // Set all remaining keys in range to Abort + for (auto range_iter = range->begin(); range_iter != range->end(); + ++range_iter) { + range->MarkKeyDone(range_iter); + *(range_iter->s) = Status::Aborted(); + } + break; + } + } + } +} +} // namespace ROCKSDB_NAMESPACE diff --git a/memtable/wbwi_memtable.h b/memtable/wbwi_memtable.h new file mode 100644 index 0000000000..1daf5d7797 --- /dev/null +++ b/memtable/wbwi_memtable.h @@ -0,0 +1,334 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include "db/memtable.h" +#include "rocksdb/utilities/write_batch_with_index.h" + +namespace ROCKSDB_NAMESPACE { + +class WBWIMemTableIterator final : public InternalIterator { + public: + WBWIMemTableIterator(std::unique_ptr&& it, SequenceNumber seqno, + const Comparator* comparator) + : it_(std::move(it)), global_seqno_(seqno), comparator_(comparator) { + assert(seqno != kMaxSequenceNumber); + s_.PermitUncheckedError(); + } + + // No copying allowed + WBWIMemTableIterator(const WBWIMemTableIterator&) = delete; + WBWIMemTableIterator& operator=(const WBWIMemTableIterator&) = delete; + + bool Valid() const override { return valid_; } + + void SeekToFirst() override { + it_->SeekToFirst(); + UpdateKey(); + } + + void SeekToLast() override { + it_->SeekToLast(); + UpdateKey(); + } + + void Seek(const Slice& target) override { + Slice target_user_key = ExtractUserKey(target); + it_->Seek(target_user_key); + if (it_->Valid()) { + // compare seqno + SequenceNumber seqno = GetInternalKeySeqno(target); + if (seqno < global_seqno_ && + comparator_->Compare(it_->Entry().key, target_user_key) == 0) { + it_->Next(); + // TODO: cannot assume distinct keys once Merge is supported + if (it_->Valid()) { + assert(comparator_->Compare(it_->Entry().key, target_user_key) > 0); + } + } + } + UpdateKey(); + } + + void SeekForPrev(const Slice& target) override { + Slice target_user_key = ExtractUserKey(target); + it_->SeekForPrev(target_user_key); + if (it_->Valid()) { + SequenceNumber seqno = GetInternalKeySeqno(target); + if (seqno > global_seqno_ && + comparator_->Compare(it_->Entry().key, target_user_key) == 0) { + it_->Prev(); + if (it_->Valid()) { + // TODO: cannot assume distinct keys once Merge is supported + assert(comparator_->Compare(it_->Entry().key, target_user_key) < 0); + } + } + } + UpdateKey(); + } + + void Next() override { + assert(Valid()); + it_->Next(); + UpdateKey(); + } + + bool NextAndGetResult(IterateResult* result) override { + assert(Valid()); + Next(); + bool is_valid = Valid(); + if (is_valid) { + result->key = key(); + result->bound_check_result = IterBoundCheck::kUnknown; + result->value_prepared = true; + } + return is_valid; + } + + void Prev() override { + assert(Valid()); + it_->Prev(); + UpdateKey(); + } + + Slice key() const override { + assert(Valid()); + return key_; + } + + Slice value() const override { + assert(Valid()); + return it_->Entry().value; + } + + Status status() const override { + assert(it_->status().ok()); + return s_; + } + + private: + static const std::unordered_map WriteTypeToValueTypeMap; + + void UpdateKey() { + valid_ = it_->Valid(); + if (!Valid()) { + key_.clear(); + return; + } + auto t = WriteTypeToValueTypeMap.find(it_->Entry().type); + assert(t != WriteTypeToValueTypeMap.end()); + if (t == WriteTypeToValueTypeMap.end()) { + key_.clear(); + valid_ = false; + s_ = Status::Corruption("Unexpected write_batch_with_index entry type " + + std::to_string(t->second)); + return; + } + key_buf_.SetInternalKey(it_->Entry().key, global_seqno_, t->second); + key_ = key_buf_.GetInternalKey(); + } + + std::unique_ptr it_; + // The sequence number of entries in this write batch. + SequenceNumber global_seqno_; + const Comparator* comparator_; + IterKey key_buf_; + // The current internal key. + Slice key_; + Status s_; + bool valid_ = false; +}; + +class WBWIMemTable final : public ReadOnlyMemTable { + public: + WBWIMemTable(const std::shared_ptr& wbwi, + const Comparator* cmp, uint32_t cf_id, + const ImmutableOptions* immutable_options, + const MutableCFOptions* cf_options) + : wbwi_(wbwi), + comparator_(cmp), + ikey_comparator_(comparator_), + moptions_(*immutable_options, *cf_options), + clock_(immutable_options->clock), + cf_id_(cf_id) {} + + // No copying allowed + WBWIMemTable(const WBWIMemTable&) = delete; + WBWIMemTable& operator=(const WBWIMemTable&) = delete; + + ~WBWIMemTable() override = default; + + const char* Name() const override { return "WBWIMemTable"; } + + size_t ApproximateMemoryUsage() override { + // FIXME: we can calculate for each CF or just divide evenly among CFs + // Used in ReportFlushInputSize(), MemPurgeDecider, flush job event logging, + // and InternalStats::HandleCurSizeAllMemTables + return 0; + } + + size_t MemoryAllocatedBytes() const override { + // FIXME: similar to ApproximateMemoryUsage(). + // Used in MemTableList to trim memtable history. + return 0; + } + + void UniqueRandomSample( + const uint64_t& /* target_sample_size */, + std::unordered_set* /* entries */) override { + // TODO: support mempurge + assert(false); + } + + InternalIterator* NewIterator( + const ReadOptions&, UnownedPtr, Arena* arena, + const SliceTransform* /* prefix_extractor */) override { + // Ingested WBWIMemTable should have an assigned seqno + assert(global_seqno_ != kMaxSequenceNumber); + assert(arena); + auto mem = arena->AllocateAligned(sizeof(WBWIMemTableIterator)); + return new (mem) WBWIMemTableIterator( + std::unique_ptr(wbwi_->NewIterator(cf_id_)), + global_seqno_, comparator_); + } + + // Returns an iterator that wraps a MemTableIterator and logically strips the + // user-defined timestamp of each key. This API is only used by flush when + // user-defined timestamps in MemTable only feature is enabled. + InternalIterator* NewTimestampStrippingIterator( + const ReadOptions&, UnownedPtr, Arena* arena, + const SliceTransform*, size_t) override { + // TODO: support UDT + assert(false); + return NewErrorInternalIterator( + Status::NotSupported( + "WBWIMemTable does not support NewTimestampStrippingIterator."), + arena); + } + + FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( + const ReadOptions&, SequenceNumber, bool) override { + // TODO: support DeleteRange + assert(!wbwi_->GetWriteBatch()->HasDeleteRange()); + return nullptr; + } + + FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator( + const ReadOptions&, SequenceNumber, size_t) override { + // TODO: support UDT + assert(false); + return nullptr; + } + + // FIXME: not a good practice to use default parameter with virtual function + using ReadOnlyMemTable::Get; + bool Get(const LookupKey& key, std::string* value, + PinnableWideColumns* columns, std::string* timestamp, Status* s, + MergeContext* merge_context, + SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, + const ReadOptions& read_opts, bool immutable_memtable, + ReadCallback* callback = nullptr, bool* is_blob_index = nullptr, + bool do_merge = true) override; + + void MultiGet(const ReadOptions& read_options, MultiGetRange* range, + ReadCallback* callback, bool immutable_memtable) override; + + uint64_t NumEntries() const override { + // FIXME: used in + // - verify number of entries processed during flush + // - stats for estimate num entries and num entries in immutable memtables + // - MemPurgeDecider + return 0; + } + + uint64_t NumDeletion() const override { + // FIXME: this is used for stats and event logging + return 0; + } + + uint64_t NumRangeDeletion() const override { + // FIXME + assert(!wbwi_->GetWriteBatch()->HasDeleteRange()); + return 0; + } + + uint64_t GetDataSize() const override { + // FIXME: used in event logging in flush_job + return 0; + } + + SequenceNumber GetEarliestSequenceNumber() override { return global_seqno_; } + + bool IsEmpty() const override { + // Ideally also check that wbwi contains updates from this CF. For now, we + // only create WBWIMemTable for CFs with updates in wbwi. + return wbwi_->GetWriteBatch()->Count() == 0; + } + + SequenceNumber GetFirstSequenceNumber() override { return global_seqno_; } + + uint64_t GetMinLogContainingPrepSection() override { + // FIXME: used to retain WAL with pending Prepare + return min_prep_log_referenced_; + } + + void MarkImmutable() override {} + + void MarkFlushed() override {} + + MemTableStats ApproximateStats(const Slice&, const Slice&) override { + // FIXME: used for query planning + return {}; + } + + const InternalKeyComparator& GetInternalKeyComparator() const override { + return ikey_comparator_; + } + + uint64_t ApproximateOldestKeyTime() const override { + // FIXME: can use the time when this is added to the DB. + return kUnknownOldestAncesterTime; + } + + bool IsFragmentedRangeTombstonesConstructed() const override { + assert(!wbwi_->GetWriteBatch()->HasDeleteRange()); + return true; + } + + const Slice& GetNewestUDT() const override { + // FIXME: support UDT + assert(false); + return newest_udt_; + } + + // Assign a sequence number to the entries in this memtable. + void SetGlobalSequenceNumber(SequenceNumber global_seqno) { + // Not expecting to assign seqno multiple times. + assert(global_seqno_ == kMaxSequenceNumber); + global_seqno_ = global_seqno; + } + + private: + InternalIterator* NewIterator() const { + assert(global_seqno_ != kMaxSequenceNumber); + return new WBWIMemTableIterator( + std::unique_ptr(wbwi_->NewIterator(cf_id_)), + global_seqno_, comparator_); + } + + Slice newest_udt_; + std::shared_ptr wbwi_; + const Comparator* comparator_; + InternalKeyComparator ikey_comparator_; + SequenceNumber global_seqno_ = kMaxSequenceNumber; + const ImmutableMemTableOptions moptions_; + SystemClock* clock_; + uint64_t min_prep_log_referenced_{0}; + // WBWI can contains updates to multiple CFs. `cf_id_` determines which CF + // this memtable is for. + uint32_t cf_id_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index 7cbc69d140..fe6dc248de 100644 --- a/src.mk +++ b/src.mk @@ -141,6 +141,7 @@ LIB_SOURCES = \ memtable/hash_skiplist_rep.cc \ memtable/skiplistrep.cc \ memtable/vectorrep.cc \ + memtable/wbwi_memtable.cc \ memtable/write_buffer_manager.cc \ monitoring/histogram.cc \ monitoring/histogram_windowing.cc \ diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index 93122bfb49..db4c53847b 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -313,6 +313,11 @@ WBWIIterator* WriteBatchWithIndex::NewIterator( &(rep->comparator)); } +WBWIIterator* WriteBatchWithIndex::NewIterator(uint32_t cf_id) const { + return new WBWIIteratorImpl(cf_id, &(rep->skip_list), &rep->write_batch, + &(rep->comparator)); +} + Iterator* WriteBatchWithIndex::NewIteratorWithBase( ColumnFamilyHandle* column_family, Iterator* base_iterator, const ReadOptions* read_options) { diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 8ed2f866bb..113f84d9fe 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -9,11 +9,14 @@ #include "rocksdb/utilities/write_batch_with_index.h" +#include + #include #include #include "db/column_family.h" #include "db/wide/wide_columns_helper.h" +#include "memtable/wbwi_memtable.h" #include "port/stack_trace.h" #include "test_util/testharness.h" #include "test_util/testutil.h" @@ -3415,6 +3418,281 @@ TEST_P(WriteBatchWithIndexTest, EntityReadSanityChecks) { } INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool()); + +std::string Get(const std::string& k, std::unique_ptr& wbwi_mem, + SequenceNumber snapshot_seq, bool* found_final_value) { + LookupKey lkey(k, snapshot_seq); + std::string val; + SequenceNumber max_range_del_seqno = 0; + SequenceNumber out_seqno = 0; + bool is_blob_index = false; + Status s; + *found_final_value = wbwi_mem->Get( + lkey, &val, nullptr, nullptr, &s, nullptr, &max_range_del_seqno, + &out_seqno, ReadOptions(), true, nullptr, &is_blob_index, true); + if (s.ok()) { + if (*found_final_value) { + EXPECT_FALSE(val.empty()); + return val; + } + return "NOT_FOUND"; + } + EXPECT_TRUE(s.IsNotFound()); + EXPECT_TRUE(*found_final_value); + return "NOT_FOUND"; +} + +class WBWIMemTableTest : public testing::Test {}; + +TEST_F(WBWIMemTableTest, ReadFromWBWIMemtable) { + // Mini stress test for read. + // Do random 10000 put and delete operations then do some overwrite. + // Keep track of expected state, then verify with Get, MultiGet, and Iterator. + const Comparator* cmp = BytewiseComparator(); + Options opts; + ImmutableOptions immutable_opts(opts); + MutableCFOptions mutable_cf_options(opts); + + Random rnd(301); + auto wbwi = std::make_shared(cmp, 0, true, 0, 0); + std::vector> expected; + expected.resize(10000); + for (int i = 0; i < 10000; ++i) { + // Leave a non-existing key 9999 in between existing keys to test read. + std::string key = i < 9999 ? DBTestBase::Key(i) : DBTestBase::Key(i + 1); + bool del = rnd.OneIn(2); + std::string val = del ? "NOT_FOUND" : rnd.RandomString(50); + expected[i] = std::make_pair(key, val); + } + // Random insertion order + RandomShuffle(expected.begin(), expected.end()); + std::unique_ptr wbwi_mem{ + new WBWIMemTable(wbwi, cmp, + /*cf_id=*/0, &immutable_opts, &mutable_cf_options)}; + ASSERT_TRUE(wbwi_mem->IsEmpty()); + constexpr SequenceNumber visible_seq = 3; + constexpr SequenceNumber non_visible_seq = 1; + constexpr SequenceNumber assigned_seq = 2; + wbwi_mem->SetGlobalSequenceNumber(assigned_seq); + + bool found_final_value = false; + for (const auto& [key, val] : expected) { + if (val == "NOT_FOUND") { + if (rnd.OneIn(2)) { + ASSERT_OK(wbwi->SingleDelete(key)); + } else { + ASSERT_OK(wbwi->Delete(key)); + } + } else { + ASSERT_OK(wbwi->Put(key, val)); + } + found_final_value = false; + // We are writing to wbwi after WBWIMemtable is created. This won't + // happen with normal usage, but we just use the hack for testing here. + ASSERT_TRUE(val == Get(key, wbwi_mem, visible_seq, &found_final_value)); + ASSERT_TRUE(found_final_value); + } + ASSERT_FALSE(wbwi_mem->IsEmpty()); + + // Some data with same key in another CF + ColumnFamilyHandleImplDummy meta_cf(/*id=*/1, BytewiseComparator()); + ASSERT_OK(wbwi->Put(&meta_cf, DBTestBase::Key(0), "foo")); + + RandomShuffle(expected.begin(), expected.end()); + // overwrites + for (size_t i = 0; i < 2000; ++i) { + // We don't expect mixing SD and DEL, or issue multiple SD consecutively in + // a DB. Read from WBWI should still work so we do it here to keep the test + // simple. + if (rnd.OneIn(2)) { + std::string val = rnd.RandomString(100); + expected[i].second = val; + ASSERT_OK(wbwi->Put(expected[i].first, val)); + } else { + expected[i].second = "NOT_FOUND"; + if (rnd.OneIn(2)) { + ASSERT_OK(wbwi->SingleDelete(expected[i].first)); + } else { + ASSERT_OK(wbwi->Delete(expected[i].first)); + } + } + found_final_value = false; + ASSERT_TRUE(expected[i].second == Get(expected[i].first, wbwi_mem, + visible_seq, &found_final_value)); + ASSERT_TRUE(found_final_value); + } + // Get a non-existing key + found_final_value = false; + ASSERT_EQ("NOT_FOUND", Get("foo", wbwi_mem, visible_seq, &found_final_value)); + ASSERT_FALSE(found_final_value); + ASSERT_EQ("NOT_FOUND", Get(DBTestBase::Key(9999), wbwi_mem, visible_seq, + &found_final_value)); + ASSERT_FALSE(found_final_value); + // Get with a non-visible snapshot + found_final_value = false; + ASSERT_EQ("NOT_FOUND", Get(DBTestBase::Key(0), wbwi_mem, non_visible_seq, + &found_final_value)); + ASSERT_FALSE(found_final_value); + // Get existing keys + RandomShuffle(expected.begin(), expected.end()); + for (const auto& [key, val] : expected) { + found_final_value = false; + ASSERT_TRUE(val == Get(key, wbwi_mem, visible_seq, &found_final_value)); + ASSERT_TRUE(found_final_value); + } + + // MultiGet + int batch_size = 30; + for (int i = 0; i < 10000; i += batch_size) { + for (uint64_t read_seq : {non_visible_seq, visible_seq}) { + autovector key_context; + autovector sorted_keys; + sorted_keys.resize(batch_size); + std::vector values(batch_size); + std::vector statuses(batch_size); + std::vector key_slice(batch_size); + std::vector key_str(batch_size); + for (int j = 0; j < batch_size; ++j) { + if (i + j >= 10000) { + // read non-existing keys + // the last key in expected is 10000 + key_str[i + j - 10000] = DBTestBase::Key(i + j + 1); + key_slice[j] = key_str[i + j - 10000]; + } else { + key_slice[j] = expected[i + j].first; + } + key_context.emplace_back(/*col_family=*/nullptr, key_slice[j], + &values[j], /*cols=*/nullptr, /*ts=*/nullptr, + &statuses[j]); + } + for (int j = 0; j < batch_size; ++j) { + sorted_keys[j] = &key_context[j]; + } + std::sort(sorted_keys.begin(), sorted_keys.begin() + batch_size, + [](const KeyContext* a, const KeyContext* b) { + return a->key->compare(*b->key) < 0; + }); + + MultiGetContext ctx(&sorted_keys, 0, batch_size, read_seq, ReadOptions(), + immutable_opts.fs.get(), + immutable_opts.statistics.get()); + MultiGetRange range = ctx.GetMultiGetRange(); + wbwi_mem->MultiGet(ReadOptions(), &range, /*callback=*/nullptr, + /*immutable_memtable=*/true); + for (int j = 0; j < batch_size; ++j) { + if (read_seq != visible_seq || i + j >= 10000) { + // Nothing is found in WBWIMemtable, status and value are not set. + ASSERT_OK(statuses[j]); + ASSERT_TRUE(values[j].empty()); + } else if (expected[i + j].second == "NOT_FOUND") { + ASSERT_TRUE(statuses[j].IsNotFound()); + } else { + ASSERT_OK(statuses[j]); + ASSERT_EQ(values[j], expected[i + j].second); + } + } + } + } + + // Sort keys to compare with iterator + std::sort(expected.begin(), expected.end(), + [](const std::pair& a, + const std::pair& b) { + return a.first < b.first; + }); + Arena arena; + InternalIterator* iter = + wbwi_mem->NewIterator(ReadOptions(), /*seqno_to_time_mapping=*/nullptr, + &arena, /*prefix_extractor=*/nullptr); + ASSERT_OK(iter->status()); + + auto verify_iter_at = [&](size_t idx) { + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(ExtractUserKey(iter->key()), expected[idx].first); + + SequenceNumber seq; + ValueType val_type; + UnPackSequenceAndType(ExtractInternalKeyFooter(iter->key()), &seq, + &val_type); + ASSERT_EQ(seq, assigned_seq); + if (expected[idx].second == "NOT_FOUND") { + ASSERT_TRUE(val_type == kTypeDeletion || val_type == kTypeSingleDeletion); + } else { + ASSERT_EQ(val_type, kTypeValue); + ASSERT_EQ(iter->value(), expected[idx].second); + } + }; + + // Seek then next, prev + IterKey seek_key; + for (int i = 0; i < 1000; i++) { + uint32_t key_idx = rnd.Uniform(10000); + for (bool seek_for_prev : {false, true}) { + if (seek_for_prev) { + seek_key.SetInternalKey(expected[key_idx].first, 0, + kValueTypeForSeekForPrev); + iter->SeekForPrev(seek_key.GetInternalKey()); + } else { + seek_key.SetInternalKey(expected[key_idx].first, visible_seq, + kValueTypeForSeek); + iter->Seek(seek_key.GetInternalKey()); + } + verify_iter_at(key_idx); + uint32_t j = key_idx + 1; + for (; j < std::min(10000u, key_idx + 5); ++j) { + iter->Next(); + verify_iter_at(j); + } + for (j -= 2; j >= key_idx; --j) { + iter->Prev(); + verify_iter_at(j); + } + } + } + + iter->SeekToFirst(); + for (size_t i = 0; i < expected.size(); ++i) { + verify_iter_at(i); + iter->Next(); + } + ASSERT_OK(iter->status()); + ASSERT_FALSE(iter->Valid()); + iter->SeekToLast(); + for (int i = static_cast(expected.size() - 1); i >= 0; --i) { + verify_iter_at(i); + iter->Prev(); + } + ASSERT_OK(iter->status()); + ASSERT_FALSE(iter->Valid()); + + // Read from another CF + std::unique_ptr meta_wbwi_mem{new WBWIMemTable( + wbwi, cmp, /*cf_id=*/1, &immutable_opts, &mutable_cf_options)}; + meta_wbwi_mem->SetGlobalSequenceNumber(assigned_seq); + found_final_value = false; + ASSERT_TRUE("foo" == Get(DBTestBase::Key(0), meta_wbwi_mem, visible_seq, + &found_final_value)); + ASSERT_TRUE(found_final_value); + found_final_value = false; + ASSERT_TRUE("NOT_FOUND" == Get(DBTestBase::Key(1), meta_wbwi_mem, visible_seq, + &found_final_value)); + ASSERT_FALSE(found_final_value); + // Deleting one memtable should not affect another memtable with the same wbwi + wbwi_mem.reset(); + // allocated by arena + iter->~InternalIterator(); + iter = meta_wbwi_mem->NewIterator(ReadOptions(), nullptr, &arena, nullptr); + iter->SeekToFirst(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(ExtractUserKey(iter->key()), DBTestBase::Key(0)); + ASSERT_EQ(iter->value(), "foo"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_FALSE(iter->Valid()); + iter->~InternalIterator(); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {