diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f649c591e..c803293ff0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -865,6 +865,8 @@ set(SOURCES util/string_util.cc util/thread_local.cc util/threadpool_imp.cc + util/udt_util.cc + util/write_batch_util.cc util/xxhash.cc utilities/agg_merge/agg_merge.cc utilities/backup/backup_engine.cc @@ -1421,6 +1423,7 @@ if(WITH_TESTS) util/timer_test.cc util/thread_list_test.cc util/thread_local_test.cc + util/udt_util_test.cc util/work_queue_test.cc utilities/agg_merge/agg_merge_test.cc utilities/backup/backup_engine_test.cc diff --git a/Makefile b/Makefile index b499e8be13..a1ea379d7a 100644 --- a/Makefile +++ b/Makefile @@ -1417,6 +1417,9 @@ thread_local_test: $(OBJ_DIR)/util/thread_local_test.o $(TEST_LIBRARY) $(LIBRARY work_queue_test: $(OBJ_DIR)/util/work_queue_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +udt_util_test: $(OBJ_DIR)/util/udt_util_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + corruption_test: $(OBJ_DIR)/db/corruption_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index ff5321312c..5125fcf54a 100644 --- a/TARGETS +++ b/TARGETS @@ -264,6 +264,8 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "util/string_util.cc", "util/thread_local.cc", "util/threadpool_imp.cc", + "util/udt_util.cc", + "util/write_batch_util.cc", "util/xxhash.cc", "utilities/agg_merge/agg_merge.cc", "utilities/backup/backup_engine.cc", @@ -5508,6 +5510,12 @@ cpp_unittest_wrapper(name="ttl_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="udt_util_test", + srcs=["util/udt_util_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="util_merge_operators_test", srcs=["utilities/util_merge_operators_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc index c6fcefddc7..114b659f5f 100644 --- a/db/db_impl/db_impl_secondary.cc +++ b/db/db_impl/db_impl_secondary.cc @@ -14,6 +14,7 @@ #include "monitoring/perf_context_imp.h" #include "rocksdb/configurable.h" #include "util/cast_util.h" +#include "util/write_batch_util.h" namespace ROCKSDB_NAMESPACE { diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 3c21986221..faaa987218 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -273,85 +273,6 @@ class DBImplSecondary : public DBImpl { return Status::OK(); } - // ColumnFamilyCollector is a write batch handler which does nothing - // except recording unique column family IDs - class ColumnFamilyCollector : public WriteBatch::Handler { - std::unordered_set column_family_ids_; - - Status AddColumnFamilyId(uint32_t column_family_id) { - if (column_family_ids_.find(column_family_id) == - column_family_ids_.end()) { - column_family_ids_.insert(column_family_id); - } - return Status::OK(); - } - - public: - explicit ColumnFamilyCollector() {} - - ~ColumnFamilyCollector() override {} - - Status PutCF(uint32_t column_family_id, const Slice&, - const Slice&) override { - return AddColumnFamilyId(column_family_id); - } - - Status DeleteCF(uint32_t column_family_id, const Slice&) override { - return AddColumnFamilyId(column_family_id); - } - - Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override { - return AddColumnFamilyId(column_family_id); - } - - Status DeleteRangeCF(uint32_t column_family_id, const Slice&, - const Slice&) override { - return AddColumnFamilyId(column_family_id); - } - - Status MergeCF(uint32_t column_family_id, const Slice&, - const Slice&) override { - return AddColumnFamilyId(column_family_id); - } - - Status PutBlobIndexCF(uint32_t column_family_id, const Slice&, - const Slice&) override { - return AddColumnFamilyId(column_family_id); - } - - Status MarkBeginPrepare(bool) override { return Status::OK(); } - - Status MarkEndPrepare(const Slice&) override { return Status::OK(); } - - Status MarkRollback(const Slice&) override { return Status::OK(); } - - Status MarkCommit(const Slice&) override { return Status::OK(); } - - Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { - return Status::OK(); - } - - Status MarkNoop(bool) override { return Status::OK(); } - - const std::unordered_set& column_families() const { - return column_family_ids_; - } - }; - - Status CollectColumnFamilyIdsFromWriteBatch( - const WriteBatch& batch, std::vector* column_family_ids) { - assert(column_family_ids != nullptr); - column_family_ids->clear(); - ColumnFamilyCollector handler; - Status s = batch.Iterate(&handler); - if (s.ok()) { - for (const auto& cf : handler.column_families()) { - column_family_ids->push_back(cf); - } - } - return s; - } - bool OwnTablesAndLogs() const override { // Currently, the secondary instance does not own the database files. It // simply opens the files of the primary instance and tracks their file diff --git a/src.mk b/src.mk index eb70ac04b7..7d2663b995 100644 --- a/src.mk +++ b/src.mk @@ -251,6 +251,8 @@ LIB_SOURCES = \ util/string_util.cc \ util/thread_local.cc \ util/threadpool_imp.cc \ + util/udt_util.cc \ + util/write_batch_util.cc \ util/xxhash.cc \ utilities/agg_merge/agg_merge.cc \ utilities/backup/backup_engine.cc \ @@ -593,6 +595,7 @@ TEST_MAIN_SOURCES = \ util/timer_test.cc \ util/thread_list_test.cc \ util/thread_local_test.cc \ + util/udt_util_test.cc \ util/work_queue_test.cc \ utilities/agg_merge/agg_merge_test.cc \ utilities/backup/backup_engine_test.cc \ diff --git a/util/udt_util.cc b/util/udt_util.cc new file mode 100644 index 0000000000..b57d49e075 --- /dev/null +++ b/util/udt_util.cc @@ -0,0 +1,259 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/udt_util.h" + +#include "db/dbformat.h" +#include "rocksdb/types.h" +#include "util/write_batch_util.h" + +namespace ROCKSDB_NAMESPACE { +namespace { +enum class RecoveryType { + kNoop, + kUnrecoverable, + kStripTimestamp, + kPadTimestamp, +}; + +RecoveryType GetRecoveryType(const size_t running_ts_sz, + const std::optional& recorded_ts_sz) { + if (running_ts_sz == 0) { + if (!recorded_ts_sz.has_value()) { + // A column family id not recorded is equivalent to that column family has + // zero timestamp size. + return RecoveryType::kNoop; + } + return RecoveryType::kStripTimestamp; + } + + assert(running_ts_sz != 0); + + if (!recorded_ts_sz.has_value()) { + return RecoveryType::kPadTimestamp; + } + + if (running_ts_sz != recorded_ts_sz.value()) { + return RecoveryType::kUnrecoverable; + } + + return RecoveryType::kNoop; +} + +bool AllRunningColumnFamiliesConsistent( + const std::unordered_map& running_ts_sz, + const std::unordered_map& record_ts_sz) { + for (const auto& [cf_id, ts_sz] : running_ts_sz) { + auto record_it = record_ts_sz.find(cf_id); + RecoveryType recovery_type = + GetRecoveryType(ts_sz, record_it != record_ts_sz.end() + ? std::optional(record_it->second) + : std::nullopt); + if (recovery_type != RecoveryType::kNoop) { + return false; + } + } + return true; +} + +Status CheckWriteBatchTimestampSizeConsistency( + const WriteBatch* batch, + const std::unordered_map& running_ts_sz, + const std::unordered_map& record_ts_sz, + TimestampSizeConsistencyMode check_mode, bool* ts_need_recovery) { + std::vector column_family_ids; + Status status = + CollectColumnFamilyIdsFromWriteBatch(*batch, &column_family_ids); + if (!status.ok()) { + return status; + } + for (const auto& cf_id : column_family_ids) { + auto running_iter = running_ts_sz.find(cf_id); + if (running_iter == running_ts_sz.end()) { + // Ignore dropped column family referred to in a WriteBatch regardless of + // its consistency. + continue; + } + auto record_iter = record_ts_sz.find(cf_id); + RecoveryType recovery_type = GetRecoveryType( + running_iter->second, record_iter != record_ts_sz.end() + ? std::optional(record_iter->second) + : std::nullopt); + if (recovery_type != RecoveryType::kNoop) { + if (check_mode == TimestampSizeConsistencyMode::kVerifyConsistency) { + return Status::InvalidArgument( + "WriteBatch contains timestamp size inconsistency."); + } + + if (recovery_type == RecoveryType::kUnrecoverable) { + return Status::InvalidArgument( + "WriteBatch contains unrecoverable timestamp size inconsistency."); + } + + // If any column family needs reconciliation, it will mark the whole + // WriteBatch to need recovery and rebuilt. + *ts_need_recovery = true; + } + } + return Status::OK(); +} +} // namespace + +TimestampRecoveryHandler::TimestampRecoveryHandler( + const std::unordered_map& running_ts_sz, + const std::unordered_map& record_ts_sz) + : running_ts_sz_(running_ts_sz), + record_ts_sz_(record_ts_sz), + new_batch_(new WriteBatch()), + handler_valid_(true) {} + +Status TimestampRecoveryHandler::PutCF(uint32_t cf, const Slice& key, + const Slice& value) { + std::string new_key_buf; + Slice new_key; + Status status = + ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key); + if (!status.ok()) { + return status; + } + return WriteBatchInternal::Put(new_batch_.get(), cf, new_key, value); +} + +Status TimestampRecoveryHandler::DeleteCF(uint32_t cf, const Slice& key) { + std::string new_key_buf; + Slice new_key; + Status status = + ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key); + if (!status.ok()) { + return status; + } + return WriteBatchInternal::Delete(new_batch_.get(), cf, new_key); +} + +Status TimestampRecoveryHandler::SingleDeleteCF(uint32_t cf, const Slice& key) { + std::string new_key_buf; + Slice new_key; + Status status = + ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key); + if (!status.ok()) { + return status; + } + return WriteBatchInternal::SingleDelete(new_batch_.get(), cf, new_key); +} + +Status TimestampRecoveryHandler::DeleteRangeCF(uint32_t cf, + const Slice& begin_key, + const Slice& end_key) { + std::string new_begin_key_buf; + Slice new_begin_key; + std::string new_end_key_buf; + Slice new_end_key; + Status status = ReconcileTimestampDiscrepancy( + cf, begin_key, &new_begin_key_buf, &new_begin_key); + if (!status.ok()) { + return status; + } + status = ReconcileTimestampDiscrepancy(cf, end_key, &new_end_key_buf, + &new_end_key); + if (!status.ok()) { + return status; + } + return WriteBatchInternal::DeleteRange(new_batch_.get(), cf, new_begin_key, + new_end_key); +} + +Status TimestampRecoveryHandler::MergeCF(uint32_t cf, const Slice& key, + const Slice& value) { + std::string new_key_buf; + Slice new_key; + Status status = + ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key); + if (!status.ok()) { + return status; + } + return WriteBatchInternal::Merge(new_batch_.get(), cf, new_key, value); +} + +Status TimestampRecoveryHandler::PutBlobIndexCF(uint32_t cf, const Slice& key, + const Slice& value) { + std::string new_key_buf; + Slice new_key; + Status status = + ReconcileTimestampDiscrepancy(cf, key, &new_key_buf, &new_key); + if (!status.ok()) { + return status; + } + return WriteBatchInternal::PutBlobIndex(new_batch_.get(), cf, new_key, value); +} + +Status TimestampRecoveryHandler::ReconcileTimestampDiscrepancy( + uint32_t cf, const Slice& key, std::string* new_key_buf, Slice* new_key) { + assert(handler_valid_); + auto running_iter = running_ts_sz_.find(cf); + if (running_iter == running_ts_sz_.end()) { + // The column family referred to by the WriteBatch is no longer running. + // Copy over the entry as is to the new WriteBatch. + *new_key = key; + return Status::OK(); + } + size_t running_ts_sz = running_iter->second; + auto record_iter = record_ts_sz_.find(cf); + std::optional record_ts_sz = + record_iter != record_ts_sz_.end() + ? std::optional(record_iter->second) + : std::nullopt; + RecoveryType recovery_type = GetRecoveryType(running_ts_sz, record_ts_sz); + + switch (recovery_type) { + case RecoveryType::kNoop: + *new_key = key; + break; + case RecoveryType::kStripTimestamp: + assert(record_ts_sz.has_value()); + *new_key = StripTimestampFromUserKey(key, record_ts_sz.value()); + break; + case RecoveryType::kPadTimestamp: + AppendKeyWithMinTimestamp(new_key_buf, key, running_ts_sz); + *new_key = *new_key_buf; + break; + case RecoveryType::kUnrecoverable: + return Status::InvalidArgument( + "Unrecoverable timestamp size inconsistency encountered by " + "TimestampRecoveryHandler."); + default: + assert(false); + } + return Status::OK(); +} + +Status HandleWriteBatchTimestampSizeDifference( + const std::unordered_map& running_ts_sz, + const std::unordered_map& record_ts_sz, + TimestampSizeConsistencyMode check_mode, + std::unique_ptr& batch) { + // Quick path to bypass checking the WriteBatch. + if (AllRunningColumnFamiliesConsistent(running_ts_sz, record_ts_sz)) { + return Status::OK(); + } + bool need_recovery = false; + Status status = CheckWriteBatchTimestampSizeConsistency( + batch.get(), running_ts_sz, record_ts_sz, check_mode, &need_recovery); + if (!status.ok()) { + return status; + } else if (need_recovery) { + SequenceNumber sequence = WriteBatchInternal::Sequence(batch.get()); + TimestampRecoveryHandler recovery_handler(running_ts_sz, record_ts_sz); + status = batch->Iterate(&recovery_handler); + if (!status.ok()) { + return status; + } else { + batch = recovery_handler.TransferNewBatch(); + WriteBatchInternal::SetSequence(batch.get(), sequence); + } + } + return Status::OK(); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/util/udt_util.h b/util/udt_util.h index 219a093d8d..78d2efdd48 100644 --- a/util/udt_util.h +++ b/util/udt_util.h @@ -1,14 +1,20 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// Copyright (c) Meta Platforms, Inc. and affiliates. +// // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #pragma once +#include +#include #include +#include #include +#include "db/write_batch_internal.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "rocksdb/write_batch.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { @@ -74,4 +80,132 @@ class UserDefinedTimestampSizeRecord { std::vector> cf_to_ts_sz_; }; +// This handler is used to recover a WriteBatch read from WAL logs during +// recovery. It does a best-effort recovery if the column families contained in +// the WriteBatch have inconsistency between the recorded timestamp size and the +// running timestamp size. And creates a new WriteBatch that are consistent with +// the running timestamp size with entries from the original WriteBatch. +// +// Note that for a WriteBatch with no inconsistency, a new WriteBatch is created +// nonetheless, and it should be exactly the same as the original WriteBatch. +// +// To access the new WriteBatch, invoke `TransferNewBatch` after calling +// `Iterate`. The handler becomes invalid afterwards. +// +// For the user key in each entry, the best effort recovery means: +// 1) If recorded timestamp size is 0, running timestamp size is > 0, a min +// timestamp of length running timestamp size is padded to the user key. +// 2) If recorded timestamp size is > 0, running timestamp size is 0, the last +// bytes of length recorded timestamp size is stripped from user key. +// 3) If recorded timestamp size is the same as running timestamp size, no-op. +// 4) If recorded timestamp size and running timestamp size are both non-zero +// but not equal, return Status::InvalidArgument. +class TimestampRecoveryHandler : public WriteBatch::Handler { + public: + TimestampRecoveryHandler( + const std::unordered_map& running_ts_sz, + const std::unordered_map& record_ts_sz); + + ~TimestampRecoveryHandler() override {} + + // No copy or move. + TimestampRecoveryHandler(const TimestampRecoveryHandler&) = delete; + TimestampRecoveryHandler(TimestampRecoveryHandler&&) = delete; + TimestampRecoveryHandler& operator=(const TimestampRecoveryHandler&) = delete; + TimestampRecoveryHandler& operator=(TimestampRecoveryHandler&&) = delete; + + Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override; + + Status DeleteCF(uint32_t cf, const Slice& key) override; + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override; + + Status DeleteRangeCF(uint32_t cf, const Slice& begin_key, + const Slice& end_key) override; + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override; + + Status PutBlobIndexCF(uint32_t cf, const Slice& key, + const Slice& value) override; + + Status MarkBeginPrepare(bool) override { return Status::OK(); } + + Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + + Status MarkCommit(const Slice&) override { return Status::OK(); } + + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + return Status::OK(); + } + + Status MarkRollback(const Slice&) override { return Status::OK(); } + + Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } + + std::unique_ptr&& TransferNewBatch() { + handler_valid_ = false; + return std::move(new_batch_); + } + + private: + Status ReconcileTimestampDiscrepancy(uint32_t cf, const Slice& key, + std::string* new_key_buf, + Slice* new_key); + + // Mapping from column family id to user-defined timestamp size for all + // running column families including the ones with zero timestamp size. + const std::unordered_map& running_ts_sz_; + + // Mapping from column family id to user-defined timestamp size as recorded + // in the WAL. This only contains non-zero user-defined timestamp size. + const std::unordered_map& record_ts_sz_; + + std::unique_ptr new_batch_; + // Handler is valid upon creation and becomes invalid after its `new_batch_` + // is transferred. + bool handler_valid_; +}; + +// Mode for checking and handling timestamp size inconsistency encountered in a +// WriteBatch read from WAL log. +enum class TimestampSizeConsistencyMode { + // Verified that the recorded user-defined timestamp size is consistent with + // the running one for all the column families involved in a WriteBatch. + // Column families referred to in the WriteBatch but are dropped are ignored. + kVerifyConsistency, + // Verified that if any inconsistency exists in a WriteBatch, it's all + // tolerable by a best-effort reconciliation. And optionally creates a new + // WriteBatch from the original WriteBatch that is consistent with the running + // timestamp size. Column families referred to in the WriteBatch but are + // dropped are ignored. If a new WriteBatch is created, such entries are + // copied over as is. + kReconcileInconsistency, +}; + +// Handles the inconsistency between recorded timestamp sizes and running +// timestamp sizes for a WriteBatch. A non-OK `status` indicates there are +// intolerable inconsistency with the specified `check_mode`. +// +// If `check_mode` is `kVerifyConsistency`, intolerable inconsistency means any +// running column family has an inconsistent user-defined timestamp size. +// +// If `check_mode` is `kReconcileInconsistency`, intolerable inconsistency means +// any running column family has an inconsistent user-defined timestamp size +// that cannot be reconciled with a best-effort recovery. Check +// `TimestampRecoveryHandler` for what a best-effort recovery is capable of. In +// this mode, a new WriteBatch is created on the heap and transferred to `batch` +// if there is tolerable inconsistency. +// +// An invariant that WAL logging ensures is that all timestamp size info +// is logged prior to a WriteBatch that needed this info. And zero timestamp +// size is skipped. So `record_ts_sz` only contains column family with non-zero +// timestamp size and a column family id absent from `record_ts_sz` will be +// interpreted as that column family has zero timestamp size. On the other hand, +// `running_ts_sz` should contain the timestamp size for all running column +// families including the ones with zero timestamp size. +Status HandleWriteBatchTimestampSizeDifference( + const std::unordered_map& running_ts_sz, + const std::unordered_map& record_ts_sz, + TimestampSizeConsistencyMode check_mode, + std::unique_ptr& batch); } // namespace ROCKSDB_NAMESPACE diff --git a/util/udt_util_test.cc b/util/udt_util_test.cc new file mode 100644 index 0000000000..d91b32e49d --- /dev/null +++ b/util/udt_util_test.cc @@ -0,0 +1,337 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/udt_util.h" + +#include + +#include "db/dbformat.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { +namespace { +static const std::string kTestKeyWithoutTs = "key"; +static const std::string kValuePlaceHolder = "value"; +} // namespace + +class HandleTimestampSizeDifferenceTest : public testing::Test { + public: + HandleTimestampSizeDifferenceTest() {} + + // Test handler used to collect the column family id and user keys contained + // in a WriteBatch for test verification. And verifies the value part stays + // the same if it's available. + class KeyCollector : public WriteBatch::Handler { + public: + explicit KeyCollector() {} + + ~KeyCollector() override {} + + Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override { + if (value.compare(kValuePlaceHolder) != 0) { + return Status::InvalidArgument(); + } + return AddKey(cf, key); + } + + Status DeleteCF(uint32_t cf, const Slice& key) override { + return AddKey(cf, key); + } + + Status SingleDeleteCF(uint32_t cf, const Slice& key) override { + return AddKey(cf, key); + } + + Status DeleteRangeCF(uint32_t cf, const Slice& begin_key, + const Slice& end_key) override { + Status status = AddKey(cf, begin_key); + if (!status.ok()) { + return status; + } + return AddKey(cf, end_key); + } + + Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override { + if (value.compare(kValuePlaceHolder) != 0) { + return Status::InvalidArgument(); + } + return AddKey(cf, key); + } + + Status PutBlobIndexCF(uint32_t cf, const Slice& key, + const Slice& value) override { + if (value.compare(kValuePlaceHolder) != 0) { + return Status::InvalidArgument(); + } + return AddKey(cf, key); + } + + Status MarkBeginPrepare(bool) override { return Status::OK(); } + + Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + + Status MarkRollback(const Slice&) override { return Status::OK(); } + + Status MarkCommit(const Slice&) override { return Status::OK(); } + + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + return Status::OK(); + } + + Status MarkNoop(bool) override { return Status::OK(); } + + const std::vector>& GetKeys() const { + return keys_; + } + + private: + Status AddKey(uint32_t cf, const Slice& key) { + keys_.push_back(std::make_pair(cf, key)); + return Status::OK(); + } + std::vector> keys_; + }; + + void CreateKey(std::string* key_buf, size_t ts_sz) { + if (ts_sz > 0) { + AppendKeyWithMinTimestamp(key_buf, kTestKeyWithoutTs, ts_sz); + } else { + key_buf->assign(kTestKeyWithoutTs); + } + } + + void CreateWriteBatch( + const std::unordered_map& ts_sz_for_batch, + std::unique_ptr& batch) { + for (const auto& [cf_id, ts_sz] : ts_sz_for_batch) { + std::string key; + CreateKey(&key, ts_sz); + ASSERT_OK( + WriteBatchInternal::Put(batch.get(), cf_id, key, kValuePlaceHolder)); + ASSERT_OK(WriteBatchInternal::Delete(batch.get(), cf_id, key)); + ASSERT_OK(WriteBatchInternal::SingleDelete(batch.get(), cf_id, key)); + ASSERT_OK(WriteBatchInternal::DeleteRange(batch.get(), cf_id, key, key)); + ASSERT_OK(WriteBatchInternal::Merge(batch.get(), cf_id, key, + kValuePlaceHolder)); + ASSERT_OK(WriteBatchInternal::PutBlobIndex(batch.get(), cf_id, key, + kValuePlaceHolder)); + } + } + + void CheckSequenceEqual(const WriteBatch& orig_batch, + const WriteBatch& new_batch) { + ASSERT_EQ(WriteBatchInternal::Sequence(&orig_batch), + WriteBatchInternal::Sequence(&new_batch)); + } + void CheckCountEqual(const WriteBatch& orig_batch, + const WriteBatch& new_batch) { + ASSERT_EQ(WriteBatchInternal::Count(&orig_batch), + WriteBatchInternal::Count(&new_batch)); + } + + void VerifyKeys( + const std::vector>& keys_with_ts, + const std::vector>& keys_without_ts, + size_t ts_sz, std::optional dropped_cf) { + ASSERT_EQ(keys_with_ts.size(), keys_without_ts.size()); + const std::string kTsMin(ts_sz, static_cast(0)); + for (size_t i = 0; i < keys_with_ts.size(); i++) { + // TimestampRecoveryHandler ignores dropped column family and copy it over + // as is. Check the keys stay the same. + if (dropped_cf.has_value() && + keys_with_ts[i].first == dropped_cf.value()) { + ASSERT_EQ(keys_with_ts[i].first, keys_without_ts[i].first); + ASSERT_EQ(keys_with_ts[i].second, keys_without_ts[i].second); + continue; + } + const Slice& key_with_ts = keys_with_ts[i].second; + const Slice& key_without_ts = keys_without_ts[i].second; + ASSERT_TRUE(key_with_ts.starts_with(key_without_ts)); + ASSERT_EQ(key_with_ts.size() - key_without_ts.size(), ts_sz); + ASSERT_TRUE(key_with_ts.ends_with(kTsMin)); + } + } + + void CheckContentsWithTimestampStripping(const WriteBatch& orig_batch, + const WriteBatch& new_batch, + size_t ts_sz, + std::optional dropped_cf) { + CheckSequenceEqual(orig_batch, new_batch); + CheckCountEqual(orig_batch, new_batch); + KeyCollector collector_for_orig_batch; + ASSERT_OK(orig_batch.Iterate(&collector_for_orig_batch)); + KeyCollector collector_for_new_batch; + ASSERT_OK(new_batch.Iterate(&collector_for_new_batch)); + VerifyKeys(collector_for_orig_batch.GetKeys(), + collector_for_new_batch.GetKeys(), ts_sz, dropped_cf); + } + + void CheckContentsWithTimestampPadding(const WriteBatch& orig_batch, + const WriteBatch& new_batch, + size_t ts_sz) { + CheckSequenceEqual(orig_batch, new_batch); + CheckCountEqual(orig_batch, new_batch); + KeyCollector collector_for_orig_batch; + ASSERT_OK(orig_batch.Iterate(&collector_for_orig_batch)); + KeyCollector collector_for_new_batch; + ASSERT_OK(new_batch.Iterate(&collector_for_new_batch)); + VerifyKeys(collector_for_new_batch.GetKeys(), + collector_for_orig_batch.GetKeys(), ts_sz, + std::nullopt /* dropped_cf */); + } +}; + +TEST_F(HandleTimestampSizeDifferenceTest, AllColumnFamiliesConsistent) { + std::unordered_map running_ts_sz = {{1, sizeof(uint64_t)}, + {2, 0}}; + std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}}; + std::unique_ptr batch(new WriteBatch()); + CreateWriteBatch(running_ts_sz, batch); + const WriteBatch* orig_batch = batch.get(); + + // All `check_mode` pass with OK status and `batch` not checked or updated. + ASSERT_OK(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency, batch)); + ASSERT_EQ(orig_batch, batch.get()); + ASSERT_OK(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); + ASSERT_EQ(orig_batch, batch.get()); +} + +TEST_F(HandleTimestampSizeDifferenceTest, + AllInconsistentColumnFamiliesDropped) { + std::unordered_map running_ts_sz = {{2, 0}}; + std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}, + {3, sizeof(char)}}; + std::unique_ptr batch(new WriteBatch()); + CreateWriteBatch(record_ts_sz, batch); + const WriteBatch* orig_batch = batch.get(); + + // All `check_mode` pass with OK status and `batch` not checked or updated. + ASSERT_OK(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency, batch)); + ASSERT_EQ(orig_batch, batch.get()); + ASSERT_OK(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); + ASSERT_EQ(orig_batch, batch.get()); +} + +TEST_F(HandleTimestampSizeDifferenceTest, InvolvedColumnFamiliesConsistent) { + std::unordered_map running_ts_sz = {{1, sizeof(uint64_t)}, + {2, sizeof(char)}}; + std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}}; + std::unique_ptr batch(new WriteBatch()); + CreateWriteBatch(record_ts_sz, batch); + const WriteBatch* orig_batch = batch.get(); + + // All `check_mode` pass with OK status and `batch` not updated. + ASSERT_OK(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency, batch)); + ASSERT_EQ(orig_batch, batch.get()); + ASSERT_OK(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); + ASSERT_EQ(orig_batch, batch.get()); +} + +TEST_F(HandleTimestampSizeDifferenceTest, + InconsistentColumnFamilyNeedsTimestampStripping) { + std::unordered_map running_ts_sz = {{1, 0}, + {2, sizeof(char)}}; + std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}}; + std::unique_ptr batch(new WriteBatch()); + CreateWriteBatch(record_ts_sz, batch); + const WriteBatch* orig_batch = batch.get(); + WriteBatch orig_batch_copy(*batch); + + // kVerifyConsistency doesn't tolerate inconsistency for running column + // families. + ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency, batch) + .IsInvalidArgument()); + + ASSERT_OK(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); + ASSERT_NE(orig_batch, batch.get()); + CheckContentsWithTimestampStripping(orig_batch_copy, *batch, sizeof(uint64_t), + std::nullopt /* dropped_cf */); +} + +TEST_F(HandleTimestampSizeDifferenceTest, + InconsistentColumnFamilyNeedsTimestampPadding) { + std::unordered_map running_ts_sz = {{1, sizeof(uint64_t)}}; + // Make `record_ts_sz` not contain zero timestamp size entries to follow the + // behavior of actual WAL log timestamp size record. + std::unordered_map record_ts_sz; + std::unique_ptr batch(new WriteBatch()); + CreateWriteBatch({{1, 0}}, batch); + const WriteBatch* orig_batch = batch.get(); + WriteBatch orig_batch_copy(*batch); + + // kVerifyConsistency doesn't tolerate inconsistency for running column + // families. + ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency, batch) + .IsInvalidArgument()); + ASSERT_OK(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); + ASSERT_NE(orig_batch, batch.get()); + CheckContentsWithTimestampPadding(orig_batch_copy, *batch, sizeof(uint64_t)); +} + +TEST_F(HandleTimestampSizeDifferenceTest, + InconsistencyReconcileCopyOverDroppedColumnFamily) { + std::unordered_map running_ts_sz = {{1, 0}}; + std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}, + {2, sizeof(char)}}; + std::unique_ptr batch(new WriteBatch()); + CreateWriteBatch(record_ts_sz, batch); + const WriteBatch* orig_batch = batch.get(); + WriteBatch orig_batch_copy(*batch); + + // kReconcileInconsistency tolerate inconsistency for dropped column family + // and all related entries copied over to the new WriteBatch. + ASSERT_OK(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, batch)); + ASSERT_NE(orig_batch, batch.get()); + CheckContentsWithTimestampStripping(orig_batch_copy, *batch, sizeof(uint64_t), + std::optional(2)); +} + +TEST_F(HandleTimestampSizeDifferenceTest, UnrecoverableInconsistency) { + std::unordered_map running_ts_sz = {{1, sizeof(char)}}; + std::unordered_map record_ts_sz = {{1, sizeof(uint64_t)}}; + std::unique_ptr batch(new WriteBatch()); + CreateWriteBatch(record_ts_sz, batch); + + ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kVerifyConsistency, batch) + .IsInvalidArgument()); + + ASSERT_TRUE(HandleWriteBatchTimestampSizeDifference( + running_ts_sz, record_ts_sz, + TimestampSizeConsistencyMode::kReconcileInconsistency, batch) + .IsInvalidArgument()); +} +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/util/write_batch_util.cc b/util/write_batch_util.cc new file mode 100644 index 0000000000..fa6a3b09be --- /dev/null +++ b/util/write_batch_util.cc @@ -0,0 +1,25 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/write_batch_util.h" + +namespace ROCKSDB_NAMESPACE { + +Status CollectColumnFamilyIdsFromWriteBatch( + const WriteBatch& batch, std::vector* column_family_ids) { + assert(column_family_ids != nullptr); + column_family_ids->clear(); + ColumnFamilyCollector handler; + Status s = batch.Iterate(&handler); + if (s.ok()) { + for (const auto& cf : handler.column_families()) { + column_family_ids->push_back(cf); + } + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/util/write_batch_util.h b/util/write_batch_util.h new file mode 100644 index 0000000000..70bbad9fc7 --- /dev/null +++ b/util/write_batch_util.h @@ -0,0 +1,80 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include +#include + +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "rocksdb/write_batch.h" + +namespace ROCKSDB_NAMESPACE { +// ColumnFamilyCollector is a write batch handler which does nothing +// except recording unique column family IDs +class ColumnFamilyCollector : public WriteBatch::Handler { + std::unordered_set column_family_ids_; + + Status AddColumnFamilyId(uint32_t column_family_id) { + column_family_ids_.insert(column_family_id); + return Status::OK(); + } + + public: + explicit ColumnFamilyCollector() {} + + ~ColumnFamilyCollector() override {} + + Status PutCF(uint32_t column_family_id, const Slice&, const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status DeleteCF(uint32_t column_family_id, const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status DeleteRangeCF(uint32_t column_family_id, const Slice&, + const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status MergeCF(uint32_t column_family_id, const Slice&, + const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status PutBlobIndexCF(uint32_t column_family_id, const Slice&, + const Slice&) override { + return AddColumnFamilyId(column_family_id); + } + + Status MarkBeginPrepare(bool) override { return Status::OK(); } + + Status MarkEndPrepare(const Slice&) override { return Status::OK(); } + + Status MarkRollback(const Slice&) override { return Status::OK(); } + + Status MarkCommit(const Slice&) override { return Status::OK(); } + + Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { + return Status::OK(); + } + + Status MarkNoop(bool) override { return Status::OK(); } + + const std::unordered_set& column_families() const { + return column_family_ids_; + } +}; + +Status CollectColumnFamilyIdsFromWriteBatch( + const WriteBatch& batch, std::vector* column_family_ids); + +} // namespace ROCKSDB_NAMESPACE