From c73d2a9d18fb8be004f142c0de1a8c8a04c2e181 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Sat, 25 Jun 2022 15:30:47 -0700 Subject: [PATCH] Add API for writing wide-column entities (#10242) Summary: The patch builds on https://github.com/facebook/rocksdb/pull/9915 and adds a new API called `PutEntity` that can be used to write a wide-column entity to the database. The new API is added to both `DB` and `WriteBatch`. Note that currently there is no way to retrieve these entities; more precisely, all read APIs (`Get`, `MultiGet`, and iterator) return `NotSupported` when they encounter a wide-column entity that is required to answer a query. Read-side support (as well as other missing functionality like `Merge`, compaction filter, and timestamp support) will be added in later PRs. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10242 Test Plan: `make check` Reviewed By: riversand963 Differential Revision: D37369748 Pulled By: ltamasi fbshipit-source-id: 7f5e412359ed7a400fd80b897dae5599dbcd685d --- CMakeLists.txt | 1 + Makefile | 3 + TARGETS | 6 + db/compaction/compaction_iterator.cc | 9 +- db/compaction/compaction_iterator_test.cc | 15 ++ db/db_impl/compacted_db_impl.h | 10 + db/db_impl/db_impl.h | 5 + db/db_impl/db_impl_readonly.h | 9 + db/db_impl/db_impl_secondary.h | 8 + db/db_impl/db_impl_write.cc | 32 +++ db/db_iter.cc | 52 ++++- db/db_iter.h | 2 + db/db_kv_checksum_test.cc | 81 +++++-- db/db_test.cc | 9 + db/memtable.cc | 28 ++- db/merge_helper.cc | 9 +- db/version_set.cc | 4 + db/version_set_sync_and_async.h | 5 + db/wide/db_wide_basic_test.cc | 213 ++++++++++++++++++ db/wide/wide_column_serialization.cc | 13 +- db/wide/wide_column_serialization_test.cc | 16 ++ db/write_batch.cc | 150 ++++++++++++ db/write_batch_internal.h | 3 + include/rocksdb/db.h | 6 + include/rocksdb/utilities/stackable_db.h | 7 + .../utilities/write_batch_with_index.h | 11 + include/rocksdb/write_batch.h | 14 ++ include/rocksdb/write_batch_base.h | 5 + src.mk | 1 + table/block_based/block.cc | 8 +- table/get_context.cc | 33 ++- table/get_context.h | 2 + 32 files changed, 714 insertions(+), 56 deletions(-) create mode 100644 db/wide/db_wide_basic_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 31f9618dfb..de32c6f2d5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1301,6 +1301,7 @@ if(WITH_TESTS) db/version_set_test.cc db/wal_manager_test.cc db/wal_edit_test.cc + db/wide/db_wide_basic_test.cc db/wide/wide_column_serialization_test.cc db/write_batch_test.cc db/write_callback_test.cc diff --git a/Makefile b/Makefile index e005c61027..8233245191 100644 --- a/Makefile +++ b/Makefile @@ -1383,6 +1383,9 @@ db_blob_compaction_test: $(OBJ_DIR)/db/blob/db_blob_compaction_test.o $(TEST_LIB db_readonly_with_timestamp_test: $(OBJ_DIR)/db/db_readonly_with_timestamp_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_wide_basic_test: $(OBJ_DIR)/db/wide/db_wide_basic_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + db_with_timestamp_basic_test: $(OBJ_DIR)/db/db_with_timestamp_basic_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index e15dc7e311..3aa61918c8 100644 --- a/TARGETS +++ b/TARGETS @@ -5240,6 +5240,12 @@ cpp_unittest_wrapper(name="db_wal_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="db_wide_basic_test", + srcs=["db/wide/db_wide_basic_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="db_with_timestamp_basic_test", srcs=["db/db_with_timestamp_basic_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index 3b167c7451..395b8be424 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -197,6 +197,7 @@ void CompactionIterator::Next() { bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until) { + // TODO: support compaction filter for wide-column entities if (!compaction_filter_ || (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) { return true; @@ -519,7 +520,8 @@ void CompactionIterator::NextFromInput() { // In the previous iteration we encountered a single delete that we could // not compact out. We will keep this Put, but can drop it's data. // (See Optimization 3, below.) - if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex) { + if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && + ikey_.type != kTypeWideColumnEntity) { ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output", ikey_.DebugString(allow_data_in_errors_, true).c_str()); assert(false); @@ -533,7 +535,7 @@ void CompactionIterator::NextFromInput() { assert(false); } - if (ikey_.type == kTypeBlobIndex) { + if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity) { ikey_.type = kTypeValue; current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); } @@ -689,7 +691,8 @@ void CompactionIterator::NextFromInput() { // either way. We will maintain counts of how many mismatches // happened if (next_ikey.type != kTypeValue && - next_ikey.type != kTypeBlobIndex) { + next_ikey.type != kTypeBlobIndex && + next_ikey.type != kTypeWideColumnEntity) { ++iter_stats_.num_single_del_mismatch; } diff --git a/db/compaction/compaction_iterator_test.cc b/db/compaction/compaction_iterator_test.cc index a5d8d30c35..2f02508045 100644 --- a/db/compaction/compaction_iterator_test.cc +++ b/db/compaction/compaction_iterator_test.cc @@ -981,6 +981,21 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest, 2 /*earliest_write_conflict_snapshot*/); } +// Same as above but with a wide-column entity. In addition to the value getting +// trimmed, the type of the KV is changed to kTypeValue. +TEST_F(CompactionIteratorWithSnapshotCheckerTest, + KeepSingleDeletionForWriteConflictChecking_WideColumnEntity) { + AddSnapshot(2, 0); + RunTest({test::KeyStr("a", 2, kTypeSingleDeletion), + test::KeyStr("a", 1, kTypeWideColumnEntity)}, + {"", "fake_entity"}, + {test::KeyStr("a", 2, kTypeSingleDeletion), + test::KeyStr("a", 1, kTypeValue)}, + {"", ""}, 2 /* last_committed_seq */, nullptr /* merge_operator */, + nullptr /* compaction_filter */, false /* bottommost_level */, + 2 /* earliest_write_conflict_snapshot */); +} + // Compaction filter should keep uncommitted key as-is, and // * Convert the latest value to deletion, and/or // * if latest value is a merge, apply filter to all subsequent merges. diff --git a/db/db_impl/compacted_db_impl.h b/db/db_impl/compacted_db_impl.h index 7a83a3cc5f..5fa412cafe 100644 --- a/db/db_impl/compacted_db_impl.h +++ b/db/db_impl/compacted_db_impl.h @@ -54,12 +54,22 @@ class CompactedDBImpl : public DBImpl { const Slice& /*key*/, const Slice& /*value*/) override { return Status::NotSupported("Not supported in compacted db mode."); } + + using DBImpl::PutEntity; + Status PutEntity(const WriteOptions& /* options */, + ColumnFamilyHandle* /* column_family */, + const Slice& /* key */, + const WideColumns& /* columns */) override { + return Status::NotSupported("Not supported in compacted db mode."); + } + using DBImpl::Merge; virtual Status Merge(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, const Slice& /*value*/) override { return Status::NotSupported("Not supported in compacted db mode."); } + using DBImpl::Delete; virtual Status Delete(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 2008085fda..294db8a9c9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -217,6 +217,11 @@ class DBImpl : public DB { Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts, const Slice& value) override; + using DB::PutEntity; + Status PutEntity(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const WideColumns& columns) override; + using DB::Merge; Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) override; diff --git a/db/db_impl/db_impl_readonly.h b/db/db_impl/db_impl_readonly.h index 3f1ea95a54..21fa1021fd 100644 --- a/db/db_impl/db_impl_readonly.h +++ b/db/db_impl/db_impl_readonly.h @@ -48,6 +48,15 @@ class DBImplReadOnly : public DBImpl { const Slice& /*key*/, const Slice& /*value*/) override { return Status::NotSupported("Not supported operation in read only mode."); } + + using DBImpl::PutEntity; + Status PutEntity(const WriteOptions& /* options */, + ColumnFamilyHandle* /* column_family */, + const Slice& /* key */, + const WideColumns& /* columns */) override { + return Status::NotSupported("Not supported operation in read only mode."); + } + using DBImpl::Merge; virtual Status Merge(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, diff --git a/db/db_impl/db_impl_secondary.h b/db/db_impl/db_impl_secondary.h index 66d6cc5d5e..d03b76f03f 100644 --- a/db/db_impl/db_impl_secondary.h +++ b/db/db_impl/db_impl_secondary.h @@ -119,6 +119,14 @@ class DBImplSecondary : public DBImpl { return Status::NotSupported("Not supported operation in secondary mode."); } + using DBImpl::PutEntity; + Status PutEntity(const WriteOptions& /* options */, + ColumnFamilyHandle* /* column_family */, + const Slice& /* key */, + const WideColumns& /* columns */) override { + return Status::NotSupported("Not supported operation in secondary mode."); + } + using DBImpl::Merge; Status Merge(const WriteOptions& /*options*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index e65ecf75a7..2f46efb9b3 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -37,6 +37,17 @@ Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, return DB::Put(o, column_family, key, ts, val); } +Status DBImpl::PutEntity(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const WideColumns& columns) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } + + return DB::PutEntity(options, column_family, key, columns); +} + Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { const Status s = FailIfCfHasTs(column_family); @@ -2236,6 +2247,27 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, return Write(opt, &batch); } +Status DB::PutEntity(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const WideColumns& columns) { + const ColumnFamilyHandle* const default_cf = DefaultColumnFamily(); + assert(default_cf); + + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + + WriteBatch batch(/* reserved_bytes */ 0, /* max_bytes */ 0, + options.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); + + const Status s = batch.PutEntity(column_family, key, columns); + if (!s.ok()) { + return s; + } + + return Write(options, &batch); +} + Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, const Slice& key) { WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, diff --git a/db/db_iter.cc b/db/db_iter.cc index d9da9624a1..b68a3bcb27 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -207,6 +207,15 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key, return true; } +bool DBIter::SetWideColumnValueIfNeeded(const Slice& /* wide_columns_slice */) { + assert(!is_blob_); + + // TODO: support wide-column entities + status_ = Status::NotSupported("Encountered unexpected wide-column entity"); + valid_ = false; + return false; +} + // PRE: saved_key_ has the current user key if skipping_saved_key // POST: saved_key_ should have the next user key if valid_, // if the current entry is a result of merge @@ -341,6 +350,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, break; case kTypeValue: case kTypeBlobIndex: + case kTypeWideColumnEntity: if (timestamp_lb_) { saved_key_.SetInternalKey(ikey_); @@ -348,6 +358,10 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) { return false; } + } else if (ikey_.type == kTypeWideColumnEntity) { + if (!SetWideColumnValueIfNeeded(iter_.value())) { + return false; + } } valid_ = true; @@ -369,6 +383,10 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key, if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) { return false; } + } else if (ikey_.type == kTypeWideColumnEntity) { + if (!SetWideColumnValueIfNeeded(iter_.value())) { + return false; + } } valid_ = true; @@ -580,6 +598,12 @@ bool DBIter::MergeValuesNewToOld() { return false; } return true; + } else if (kTypeWideColumnEntity == ikey.type) { + // TODO: support wide-column entities + status_ = Status::NotSupported( + "Merge currently not supported for wide-column entities"); + valid_ = false; + return false; } else { valid_ = false; status_ = Status::Corruption( @@ -783,7 +807,8 @@ bool DBIter::FindValueForCurrentKey() { merge_context_.Clear(); current_entry_is_merged_ = false; // last entry before merge (could be kTypeDeletion, - // kTypeDeletionWithTimestamp, kTypeSingleDeletion or kTypeValue) + // kTypeDeletionWithTimestamp, kTypeSingleDeletion, kTypeValue, + // kTypeBlobIndex, or kTypeWideColumnEntity) ValueType last_not_merge_type = kTypeDeletion; ValueType last_key_entry_type = kTypeDeletion; @@ -831,6 +856,7 @@ bool DBIter::FindValueForCurrentKey() { switch (last_key_entry_type) { case kTypeValue: case kTypeBlobIndex: + case kTypeWideColumnEntity: if (range_del_agg_.ShouldDelete( ikey, RangeDelPositioningMode::kBackwardTraversal)) { last_key_entry_type = kTypeRangeDeletion; @@ -927,6 +953,12 @@ bool DBIter::FindValueForCurrentKey() { } is_blob_ = false; return true; + } else if (last_not_merge_type == kTypeWideColumnEntity) { + // TODO: support wide-column entities + status_ = Status::NotSupported( + "Merge currently not supported for wide-column entities"); + valid_ = false; + return false; } else { assert(last_not_merge_type == kTypeValue); s = Merge(&pinned_value_, saved_key_.GetUserKey()); @@ -944,6 +976,11 @@ bool DBIter::FindValueForCurrentKey() { return false; } break; + case kTypeWideColumnEntity: + if (!SetWideColumnValueIfNeeded(pinned_value_)) { + return false; + } + break; default: valid_ = false; status_ = Status::Corruption( @@ -1034,13 +1071,18 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_); saved_timestamp_.assign(ts.data(), ts.size()); } - if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { + if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex || + ikey.type == kTypeWideColumnEntity) { assert(iter_.iter()->IsValuePinned()); pinned_value_ = iter_.value(); if (ikey.type == kTypeBlobIndex) { if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) { return false; } + } else if (ikey_.type == kTypeWideColumnEntity) { + if (!SetWideColumnValueIfNeeded(pinned_value_)) { + return false; + } } valid_ = true; @@ -1109,6 +1151,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { } is_blob_ = false; return true; + } else if (ikey.type == kTypeWideColumnEntity) { + // TODO: support wide-column entities + status_ = Status::NotSupported( + "Merge currently not supported for wide-column entities"); + valid_ = false; + return false; } else { valid_ = false; status_ = Status::Corruption( diff --git a/db/db_iter.h b/db/db_iter.h index 1c549bcf7f..975f0e8241 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -298,6 +298,8 @@ class DBIter final : public Iterator { // index when using the integrated BlobDB implementation. bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index); + bool SetWideColumnValueIfNeeded(const Slice& wide_columns_slice); + Status Merge(const Slice* val, const Slice& user_key); const SliceTransform* prefix_extractor_; diff --git a/db/db_kv_checksum_test.cc b/db/db_kv_checksum_test.cc index 3bed9d784e..bf5c5b4647 100644 --- a/db/db_kv_checksum_test.cc +++ b/db/db_kv_checksum_test.cc @@ -15,6 +15,7 @@ enum class WriteBatchOpType { kSingleDelete, kDeleteRange, kMerge, + kPutEntity, kNum, }; @@ -62,18 +63,38 @@ std::pair GetWriteBatch(ColumnFamilyHandle* cf_handle, case WriteBatchOpType::kMerge: s = wb.Merge(cf_handle, "key", "val"); break; + case WriteBatchOpType::kPutEntity: + s = wb.PutEntity(cf_handle, "key", + {{"attr_name1", "foo"}, {"attr_name2", "bar"}}); + break; case WriteBatchOpType::kNum: assert(false); } return {std::move(wb), std::move(s)}; } -class DbKvChecksumTest : public DBTestBase, +class DbKvChecksumTestBase : public DBTestBase { + public: + DbKvChecksumTestBase(const std::string& path, bool env_do_fsync) + : DBTestBase(path, env_do_fsync) {} + + ColumnFamilyHandle* GetCFHandleToUse(ColumnFamilyHandle* column_family, + WriteBatchOpType op_type) const { + // Note: PutEntity cannot be called without column family + if (op_type == WriteBatchOpType::kPutEntity && !column_family) { + return db_->DefaultColumnFamily(); + } + + return column_family; + } +}; + +class DbKvChecksumTest : public DbKvChecksumTestBase, public ::testing::WithParamInterface< std::tuple> { public: DbKvChecksumTest() - : DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { + : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { op_type_ = std::get<0>(GetParam()); corrupt_byte_addend_ = std::get<1>(GetParam()); write_mode_ = std::get<2>(GetParam()); @@ -82,14 +103,16 @@ class DbKvChecksumTest : public DBTestBase, Status ExecuteWrite(ColumnFamilyHandle* cf_handle) { switch (write_mode_) { case WriteMode::kWriteProtectedBatch: { - auto batch_and_status = GetWriteBatch( - cf_handle, 8 /* protection_bytes_per_key */, op_type_); + auto batch_and_status = + GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_), + 8 /* protection_bytes_per_key */, op_type_); assert(batch_and_status.second.ok()); return db_->Write(WriteOptions(), &batch_and_status.first); } case WriteMode::kWriteUnprotectedBatch: { - auto batch_and_status = GetWriteBatch( - cf_handle, 0 /* protection_bytes_per_key */, op_type_); + auto batch_and_status = + GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_), + 0 /* protection_bytes_per_key */, op_type_); assert(batch_and_status.second.ok()); WriteOptions write_opts; write_opts.protection_bytes_per_key = 8; @@ -135,10 +158,10 @@ std::string GetOpTypeString(const WriteBatchOpType& op_type) { return "SingleDelete"; case WriteBatchOpType::kDeleteRange: return "DeleteRange"; - break; case WriteBatchOpType::kMerge: return "Merge"; - break; + case WriteBatchOpType::kPutEntity: + return "PutEntity"; case WriteBatchOpType::kNum: assert(false); } @@ -242,7 +265,8 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) { TEST_P(DbKvChecksumTest, NoCorruptionCase) { // If this test fails, we may have found a piece of malfunctioned hardware auto batch_and_status = - GetWriteBatch(nullptr, 8 /* protection_bytes_per_key */, op_type_); + GetWriteBatch(GetCFHandleToUse(nullptr, op_type_), + 8 /* protection_bytes_per_key */, op_type_); ASSERT_OK(batch_and_status.second); ASSERT_OK(batch_and_status.first.VerifyChecksum()); } @@ -323,12 +347,12 @@ TEST_P(DbKvChecksumTest, WriteToWALWithColumnFamilyCorrupted) { } class DbKvChecksumTestMergedBatch - : public DBTestBase, + : public DbKvChecksumTestBase, public ::testing::WithParamInterface< std::tuple> { public: DbKvChecksumTestMergedBatch() - : DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { + : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { op_type1_ = std::get<0>(GetParam()); op_type2_ = std::get<1>(GetParam()); corrupt_byte_addend_ = std::get<2>(GetParam()); @@ -349,10 +373,10 @@ void CorruptWriteBatch(Slice* content, size_t offset, TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) { // Veirfy write batch checksum after write batch append - auto batch1 = GetWriteBatch(nullptr /* cf_handle */, + auto batch1 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_), 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(batch1.second); - auto batch2 = GetWriteBatch(nullptr /* cf_handle */, + auto batch2 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_), 8 /* protection_bytes_per_key */, op_type2_); ASSERT_OK(batch2.second); ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first)); @@ -374,11 +398,13 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) { options.merge_operator = MergeOperators::CreateStringAppendOperator(); } - auto leader_batch_and_status = GetWriteBatch( - nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_); + auto leader_batch_and_status = + GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_), + 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(leader_batch_and_status.second); - auto follower_batch_and_status = GetWriteBatch( - nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type2_); + auto follower_batch_and_status = + GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_), + 8 /* protection_bytes_per_key */, op_type2_); size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); size_t total_bytes = leader_batch_size + follower_batch_and_status.first.GetDataSize(); @@ -419,7 +445,7 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) { // follower follower_thread = port::Thread([&]() { follower_batch_and_status = - GetWriteBatch(nullptr /* cf_handle */, + GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_), 8 /* protection_bytes_per_key */, op_type2_); ASSERT_OK(follower_batch_and_status.second); ASSERT_TRUE( @@ -443,8 +469,9 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) { Reopen(options); SyncPoint::GetInstance()->EnableProcessing(); auto log_size_pre_write = dbfull()->TEST_total_log_size(); - leader_batch_and_status = GetWriteBatch( - nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_); + leader_batch_and_status = + GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_), + 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(leader_batch_and_status.second); ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) .IsCorruption()); @@ -484,10 +511,12 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) { CreateAndReopenWithCF({"ramen"}, options); auto leader_batch_and_status = - GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_); + GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_), + 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(leader_batch_and_status.second); auto follower_batch_and_status = - GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type2_); + GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_), + 8 /* protection_bytes_per_key */, op_type2_); size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); size_t total_bytes = leader_batch_size + follower_batch_and_status.first.GetDataSize(); @@ -527,8 +556,9 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) { // Start the other writer thread which will join the write group as // follower follower_thread = port::Thread([&]() { - follower_batch_and_status = GetWriteBatch( - handles_[1], 8 /* protection_bytes_per_key */, op_type2_); + follower_batch_and_status = + GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_), + 8 /* protection_bytes_per_key */, op_type2_); ASSERT_OK(follower_batch_and_status.second); ASSERT_TRUE( db_->Write(WriteOptions(), &follower_batch_and_status.first) @@ -553,7 +583,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) { SyncPoint::GetInstance()->EnableProcessing(); auto log_size_pre_write = dbfull()->TEST_total_log_size(); leader_batch_and_status = - GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_); + GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_), + 8 /* protection_bytes_per_key */, op_type1_); ASSERT_OK(leader_batch_and_status.second); ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) .IsCorruption()); diff --git a/db/db_test.cc b/db/db_test.cc index a47e8fdb48..1e430d08c8 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2897,6 +2897,15 @@ class ModelDB : public DB { const Slice& /*v*/) override { return Status::NotSupported(); } + + using DB::PutEntity; + Status PutEntity(const WriteOptions& /* options */, + ColumnFamilyHandle* /* column_family */, + const Slice& /* key */, + const WideColumns& /* columns */) override { + return Status::NotSupported(); + } + using DB::Close; Status Close() override { return Status::OK(); } using DB::Delete; diff --git a/db/memtable.cc b/db/memtable.cc index f97bb292e1..e011641884 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -740,21 +740,33 @@ static bool SaveValue(void* arg, const char* entry) { s->seq = seq; - if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && + if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || + type == kTypeWideColumnEntity) && max_covering_tombstone_seq > seq) { type = kTypeRangeDeletion; } switch (type) { case kTypeBlobIndex: - if (s->is_blob_index == nullptr) { - ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index."); - *(s->status) = Status::NotSupported( - "Encounter unsupported blob value. Please open DB with " - "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); - } else if (*(s->merge_in_progress)) { + case kTypeWideColumnEntity: + if (*(s->merge_in_progress)) { + *(s->status) = Status::NotSupported("Merge operator not supported"); + } else if (!s->do_merge) { + *(s->status) = Status::NotSupported("GetMergeOperands not supported"); + } else if (type == kTypeBlobIndex) { + if (s->is_blob_index == nullptr) { + ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index."); + *(s->status) = Status::NotSupported( + "Encounter unsupported blob value. Please open DB with " + "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); + } + } else { + assert(type == kTypeWideColumnEntity); + + // TODO: support wide-column entities *(s->status) = - Status::NotSupported("Blob DB does not support merge operator."); + Status::NotSupported("Encountered unexpected wide-column entity"); } + if (!s->status->ok()) { *(s->found_final_value) = true; return false; diff --git a/db/merge_helper.cc b/db/merge_helper.cc index 2178768490..5430509ebf 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -212,11 +212,16 @@ Status MergeHelper::MergeUntil(InternalIterator* iter, const Slice val = iter->value(); PinnableSlice blob_value; const Slice* val_ptr; - if ((kTypeValue == ikey.type || kTypeBlobIndex == ikey.type) && + if ((kTypeValue == ikey.type || kTypeBlobIndex == ikey.type || + kTypeWideColumnEntity == ikey.type) && (range_del_agg == nullptr || !range_del_agg->ShouldDelete( ikey, RangeDelPositioningMode::kForwardTraversal))) { - if (ikey.type == kTypeBlobIndex) { + if (ikey.type == kTypeWideColumnEntity) { + // TODO: support wide-column entities + return Status::NotSupported( + "Merge currently not supported for wide-column entities"); + } else if (ikey.type == kTypeBlobIndex) { BlobIndex blob_index; s = blob_index.DecodeFrom(val); diff --git a/db/version_set.cc b/db/version_set.cc index 66d55d4f76..ea91a9b1e5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2161,6 +2161,10 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k, "Encounter unexpected blob index. Please open DB with " "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); return; + case GetContext::kUnexpectedWideColumnEntity: + *status = + Status::NotSupported("Encountered unexpected wide-column entity"); + return; } f = fp.GetNextFile(); } diff --git a/db/version_set_sync_and_async.h b/db/version_set_sync_and_async.h index 229c59a6e2..e06cc08da3 100644 --- a/db/version_set_sync_and_async.h +++ b/db/version_set_sync_and_async.h @@ -141,6 +141,11 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); file_range.MarkKeyDone(iter); continue; + case GetContext::kUnexpectedWideColumnEntity: + *status = + Status::NotSupported("Encountered unexpected wide-column entity"); + file_range.MarkKeyDone(iter); + continue; } } diff --git a/db/wide/db_wide_basic_test.cc b/db/wide/db_wide_basic_test.cc new file mode 100644 index 0000000000..89fe5c237b --- /dev/null +++ b/db/wide/db_wide_basic_test.cc @@ -0,0 +1,213 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "test_util/testutil.h" +#include "utilities/merge_operators.h" + +namespace ROCKSDB_NAMESPACE { + +class DBWideBasicTest : public DBTestBase { + protected: + explicit DBWideBasicTest() + : DBTestBase("db_wide_basic_test", /* env_do_fsync */ false) {} +}; + +TEST_F(DBWideBasicTest, PutEntity) { + Options options = GetDefaultOptions(); + + // Use the DB::PutEntity API + constexpr char first_key[] = "first"; + WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; + + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns)); + + // Use WriteBatch + constexpr char second_key[] = "second"; + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; + + WriteBatch batch; + ASSERT_OK( + batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + + // Note: currently, read APIs are supposed to return NotSupported + auto verify = [&]() { + { + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key, + &result) + .IsNotSupported()); + } + + { + PinnableSlice result; + ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), + second_key, &result) + .IsNotSupported()); + } + + { + constexpr size_t num_keys = 2; + + std::array keys{{first_key, second_key}}; + std::array values; + std::array statuses; + + db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, + &keys[0], &values[0], &statuses[0]); + + ASSERT_TRUE(values[0].empty()); + ASSERT_TRUE(statuses[0].IsNotSupported()); + + ASSERT_TRUE(values[1].empty()); + ASSERT_TRUE(statuses[1].IsNotSupported()); + } + + { + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + + iter->SeekToFirst(); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsNotSupported()); + + iter->SeekToLast(); + ASSERT_FALSE(iter->Valid()); + ASSERT_TRUE(iter->status().IsNotSupported()); + } + }; + + // Try reading from memtable + verify(); + + // Try reading after recovery + Close(); + options.avoid_flush_during_recovery = true; + Reopen(options); + + verify(); + + // Try reading from storage + ASSERT_OK(Flush()); + + verify(); + + // Add a couple of merge operands + Close(); + options.merge_operator = MergeOperators::CreateStringAppendOperator(); + Reopen(options); + + constexpr char merge_operand[] = "bla"; + + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key, + merge_operand)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, + merge_operand)); + + // Try reading from memtable + verify(); + + // Try reading from storage + ASSERT_OK(Flush()); + + verify(); + + // Do it again, with the Put and the Merge in the same memtable + ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key, + merge_operand)); + ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key, + merge_operand)); + + // Try reading from memtable + verify(); +} + +TEST_F(DBWideBasicTest, PutEntityColumnFamily) { + Options options = GetDefaultOptions(); + CreateAndReopenWithCF({"corinthian"}, options); + + // Use the DB::PutEntity API + constexpr char first_key[] = "first"; + WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; + + ASSERT_OK( + db_->PutEntity(WriteOptions(), handles_[1], first_key, first_columns)); + + // Use WriteBatch + constexpr char second_key[] = "second"; + WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}}; + + WriteBatch batch; + ASSERT_OK(batch.PutEntity(handles_[1], second_key, second_columns)); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); +} + +TEST_F(DBWideBasicTest, PutEntityTimestampError) { + // Note: timestamps are currently not supported + + Options options = GetDefaultOptions(); + options.comparator = test::BytewiseComparatorWithU64TsWrapper(); + + ColumnFamilyHandle* handle = nullptr; + ASSERT_OK(db_->CreateColumnFamily(options, "corinthian", &handle)); + std::unique_ptr handle_guard(handle); + + // Use the DB::PutEntity API + constexpr char first_key[] = "first"; + WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}}; + + ASSERT_TRUE(db_->PutEntity(WriteOptions(), handle, first_key, first_columns) + .IsInvalidArgument()); + + // Use WriteBatch + constexpr char second_key[] = "second"; + WideColumns second_columns{{"doric", "column"}, {"ionic", "column"}}; + + WriteBatch batch; + ASSERT_TRUE( + batch.PutEntity(handle, second_key, second_columns).IsInvalidArgument()); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); +} + +TEST_F(DBWideBasicTest, PutEntitySerializationError) { + // Make sure duplicate columns are caught + + Options options = GetDefaultOptions(); + + // Use the DB::PutEntity API + constexpr char first_key[] = "first"; + WideColumns first_columns{{"foo", "bar"}, {"foo", "baz"}}; + + ASSERT_TRUE(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), + first_key, first_columns) + .IsCorruption()); + + // Use WriteBatch + constexpr char second_key[] = "second"; + WideColumns second_columns{{"column", "doric"}, {"column", "ionic"}}; + + WriteBatch batch; + ASSERT_TRUE( + batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns) + .IsCorruption()); + ASSERT_OK(db_->Write(WriteOptions(), &batch)); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + RegisterCustomObjects(argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/wide/wide_column_serialization.cc b/db/wide/wide_column_serialization.cc index f44b81e2cb..fb002366b6 100644 --- a/db/wide/wide_column_serialization.cc +++ b/db/wide/wide_column_serialization.cc @@ -17,12 +17,6 @@ namespace ROCKSDB_NAMESPACE { Status WideColumnSerialization::Serialize(const WideColumns& columns, std::string& output) { - // Column names should be strictly ascending - assert(std::adjacent_find(columns.cbegin(), columns.cend(), - [](const WideColumn& lhs, const WideColumn& rhs) { - return lhs.name().compare(rhs.name()) > 0; - }) == columns.cend()); - if (columns.size() > static_cast(std::numeric_limits::max())) { return Status::InvalidArgument("Too many wide columns"); @@ -32,12 +26,17 @@ Status WideColumnSerialization::Serialize(const WideColumns& columns, PutVarint32(&output, static_cast(columns.size())); - for (const auto& column : columns) { + for (size_t i = 0; i < columns.size(); ++i) { + const WideColumn& column = columns[i]; + const Slice& name = column.name(); if (name.size() > static_cast(std::numeric_limits::max())) { return Status::InvalidArgument("Wide column name too long"); } + if (i > 0 && columns[i - 1].name().compare(name) >= 0) { + return Status::Corruption("Wide columns out of order"); + } const Slice& value = column.value(); if (value.size() > diff --git a/db/wide/wide_column_serialization_test.cc b/db/wide/wide_column_serialization_test.cc index 8421a8f61d..87d0d647e4 100644 --- a/db/wide/wide_column_serialization_test.cc +++ b/db/wide/wide_column_serialization_test.cc @@ -124,6 +124,22 @@ TEST(WideColumnSerializationTest, SerializeDeserialize) { } } +TEST(WideColumnSerializationTest, SerializeDuplicateError) { + WideColumns columns{{"foo", "bar"}, {"foo", "baz"}}; + std::string output; + + ASSERT_TRUE( + WideColumnSerialization::Serialize(columns, output).IsCorruption()); +} + +TEST(WideColumnSerializationTest, SerializeOutOfOrderError) { + WideColumns columns{{"hello", "world"}, {"foo", "bar"}}; + std::string output; + + ASSERT_TRUE( + WideColumnSerialization::Serialize(columns, output).IsCorruption()); +} + TEST(WideColumnSerializationTest, DeserializeVersionError) { // Can't decode version diff --git a/db/write_batch.cc b/db/write_batch.cc index 34a8025385..5a71ad7826 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -29,6 +29,8 @@ // kTypeRollbackXID varstring // kTypeBeginPersistedPrepareXID // kTypeBeginUnprepareXID +// kTypeWideColumnEntity varstring varstring +// kTypeColumnFamilyWideColumnEntity varint32 varstring varstring // kTypeNoop // varstring := // len: varint32 @@ -36,6 +38,8 @@ #include "rocksdb/write_batch.h" +#include +#include #include #include #include @@ -52,6 +56,7 @@ #include "db/merge_context.h" #include "db/snapshot_impl.h" #include "db/trim_history_scheduler.h" +#include "db/wide/wide_column_serialization.h" #include "db/write_batch_internal.h" #include "monitoring/perf_context_imp.h" #include "monitoring/statistics.h" @@ -82,6 +87,7 @@ enum ContentFlags : uint32_t { HAS_DELETE_RANGE = 1 << 9, HAS_BLOB_INDEX = 1 << 10, HAS_BEGIN_UNPREPARE = 1 << 11, + HAS_PUT_ENTITY = 1 << 12, }; struct BatchContentClassifier : public WriteBatch::Handler { @@ -92,6 +98,12 @@ struct BatchContentClassifier : public WriteBatch::Handler { return Status::OK(); } + Status PutEntityCF(uint32_t /* column_family_id */, const Slice& /* key */, + const Slice& /* entity */) override { + content_flags |= ContentFlags::HAS_PUT_ENTITY; + return Status::OK(); + } + Status DeleteCF(uint32_t, const Slice&) override { content_flags |= ContentFlags::HAS_DELETE; return Status::OK(); @@ -287,6 +299,10 @@ bool WriteBatch::HasPut() const { return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; } +bool WriteBatch::HasPutEntity() const { + return (ComputeContentFlags() & ContentFlags::HAS_PUT_ENTITY) != 0; +} + bool WriteBatch::HasDelete() const { return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0; } @@ -435,6 +451,17 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag, return Status::Corruption("bad Rollback XID"); } break; + case kTypeColumnFamilyWideColumnEntity: + if (!GetVarint32(input, column_family)) { + return Status::Corruption("bad WriteBatch PutEntity"); + } + FALLTHROUGH_INTENDED; + case kTypeWideColumnEntity: + if (!GetLengthPrefixedSlice(input, key) || + !GetLengthPrefixedSlice(input, value)) { + return Status::Corruption("bad WriteBatch PutEntity"); + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -462,6 +489,7 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size()); Slice key, value, blob, xid; + // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as // the batch boundary symbols otherwise we would mis-count the number of // batches. We do that by checking whether the accumulated batch is empty @@ -661,6 +689,16 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb, assert(s.ok()); empty_batch = true; break; + case kTypeWideColumnEntity: + case kTypeColumnFamilyWideColumnEntity: + assert(wb->content_flags_.load(std::memory_order_relaxed) & + (ContentFlags::DEFERRED | ContentFlags::HAS_PUT_ENTITY)); + s = handler->PutEntityCF(column_family, key, value); + if (LIKELY(s.ok())) { + empty_batch = false; + ++found; + } + break; default: return Status::Corruption("unknown WriteBatch tag"); } @@ -891,6 +929,86 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, "Cannot call this method on column family enabling timestamp"); } +Status WriteBatchInternal::PutEntity(WriteBatch* b, uint32_t column_family_id, + const Slice& key, + const WideColumns& columns) { + assert(b); + + if (key.size() > size_t{std::numeric_limits::max()}) { + return Status::InvalidArgument("key is too large"); + } + + WideColumns sorted_columns(columns); + std::sort(sorted_columns.begin(), sorted_columns.end(), + [](const WideColumn& lhs, const WideColumn& rhs) { + return lhs.name().compare(rhs.name()) < 0; + }); + + std::string entity; + const Status s = WideColumnSerialization::Serialize(sorted_columns, entity); + if (!s.ok()) { + return s; + } + + if (entity.size() > size_t{std::numeric_limits::max()}) { + return Status::InvalidArgument("wide column entity is too large"); + } + + LocalSavePoint save(b); + + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeWideColumnEntity)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilyWideColumnEntity)); + PutVarint32(&b->rep_, column_family_id); + } + + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, entity); + + b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) | + ContentFlags::HAS_PUT_ENTITY, + std::memory_order_relaxed); + + if (b->prot_info_ != nullptr) { + b->prot_info_->entries_.emplace_back( + ProtectionInfo64() + .ProtectKVO(key, entity, kTypeWideColumnEntity) + .ProtectC(column_family_id)); + } + + return save.commit(); +} + +Status WriteBatch::PutEntity(ColumnFamilyHandle* column_family, + const Slice& key, const WideColumns& columns) { + if (!column_family) { + return Status::InvalidArgument( + "Cannot call this method without a column family handle"); + } + + Status s; + uint32_t cf_id = 0; + size_t ts_sz = 0; + + std::tie(s, cf_id, ts_sz) = + WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this, + column_family); + + if (!s.ok()) { + return s; + } + + if (ts_sz) { + return Status::InvalidArgument( + "Cannot call this method on column family enabling timestamp"); + } + + return WriteBatchInternal::PutEntity(this, cf_id, key, columns); +} + Status WriteBatchInternal::InsertNoop(WriteBatch* b) { b->rep_.push_back(static_cast(kTypeNoop)); return Status::OK(); @@ -1556,6 +1674,10 @@ Status WriteBatch::VerifyChecksum() const { case kTypeCommitXIDAndTimestamp: checksum_protected = false; break; + case kTypeColumnFamilyWideColumnEntity: + case kTypeWideColumnEntity: + tag = kTypeWideColumnEntity; + break; default: return Status::Corruption( "unknown WriteBatch tag", @@ -1994,6 +2116,29 @@ class MemTableInserter : public WriteBatch::Handler { return ret_status; } + Status PutEntityCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + const auto* kv_prot_info = NextProtectionInfo(); + + Status s; + if (kv_prot_info) { + // Memtable needs seqno, doesn't need CF ID + auto mem_kv_prot_info = + kv_prot_info->StripC(column_family_id).ProtectS(sequence_); + s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity, + &mem_kv_prot_info); + } else { + s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity, + /* kv_prot_info */ nullptr); + } + + if (UNLIKELY(s.IsTryAgain())) { + DecrementProtectionInfoIdxForTryAgain(); + } + + return s; + } + Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key, const Slice& value, ValueType delete_type, const ProtectionInfoKVOS64* kv_prot_info) { @@ -2780,6 +2925,11 @@ class ProtectionInfoUpdater : public WriteBatch::Handler { return UpdateProtInfo(cf, key, val, kTypeValue); } + Status PutEntityCF(uint32_t cf, const Slice& key, + const Slice& entity) override { + return UpdateProtInfo(cf, key, entity, kTypeWideColumnEntity); + } + Status DeleteCF(uint32_t cf, const Slice& key) override { return UpdateProtInfo(cf, key, "", kTypeDeletion); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 53e83a23e7..9bc5ab98a2 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -88,6 +88,9 @@ class WriteBatchInternal { static Status Put(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value); + static Status PutEntity(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const WideColumns& columns); + static Status Delete(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 378cd873a4..44d455c43e 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -27,6 +27,7 @@ #include "rocksdb/transaction_log.h" #include "rocksdb/types.h" #include "rocksdb/version.h" +#include "rocksdb/wide_columns.h" #ifdef _WIN32 // Windows API macro interference @@ -406,6 +407,11 @@ class DB { return Put(options, DefaultColumnFamily(), key, ts, value); } + // UNDER CONSTRUCTION -- DO NOT USE + virtual Status PutEntity(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const WideColumns& columns) = 0; + // Remove the database entry (if any) for "key". Returns OK on // success, and a non-OK status on error. It is not an error if "key" // did not exist in the database. diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 5bdd8d4a6a..d73d03e65d 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -85,6 +85,13 @@ class StackableDB : public DB { return db_->Put(options, column_family, key, ts, val); } + using DB::PutEntity; + Status PutEntity(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const WideColumns& columns) override { + return db_->PutEntity(options, column_family, key, columns); + } + using DB::Get; virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 21974e67a1..84dc11a312 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -113,6 +113,17 @@ class WriteBatchWithIndex : public WriteBatchBase { Status Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& ts, const Slice& value) override; + Status PutEntity(ColumnFamilyHandle* column_family, const Slice& /* key */, + const WideColumns& /* columns */) override { + if (!column_family) { + return Status::InvalidArgument( + "Cannot call this method without a column family handle"); + } + + return Status::NotSupported( + "PutEntity not supported by WriteBatchWithIndex"); + } + using WriteBatchBase::Merge; Status Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) override; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index f4838272be..e722bfe82e 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -100,6 +100,11 @@ class WriteBatch : public WriteBatchBase { return Put(nullptr, key, value); } + // UNDER CONSTRUCTION -- DO NOT USE + using WriteBatchBase::PutEntity; + Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key, + const WideColumns& columns) override; + using WriteBatchBase::Delete; // If the database contains a mapping for "key", erase it. Else do nothing. // The following Delete(..., const Slice& key) can be used when user-defined @@ -240,6 +245,12 @@ class WriteBatch : public WriteBatchBase { } virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {} + virtual Status PutEntityCF(uint32_t /* column_family_id */, + const Slice& /* key */, + const Slice& /* entity */) { + return Status::NotSupported("PutEntityCF not implemented"); + } + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { if (column_family_id == 0) { Delete(key); @@ -346,6 +357,9 @@ class WriteBatch : public WriteBatchBase { // Returns true if PutCF will be called during Iterate bool HasPut() const; + // Returns true if PutEntityCF will be called during Iterate + bool HasPutEntity() const; + // Returns true if DeleteCF will be called during Iterate bool HasDelete() const; diff --git a/include/rocksdb/write_batch_base.h b/include/rocksdb/write_batch_base.h index 480a922e30..33dae4f085 100644 --- a/include/rocksdb/write_batch_base.h +++ b/include/rocksdb/write_batch_base.h @@ -11,6 +11,7 @@ #include #include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/wide_columns.h" namespace ROCKSDB_NAMESPACE { @@ -41,6 +42,10 @@ class WriteBatchBase { const SliceParts& value); virtual Status Put(const SliceParts& key, const SliceParts& value); + // UNDER CONSTRUCTION -- DO NOT USE + virtual Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key, + const WideColumns& columns) = 0; + // Merge "value" with the existing value of "key" in the database. // "key->merge(existing, value)" virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key, diff --git a/src.mk b/src.mk index c2bd53dbb3..b7a5325b39 100644 --- a/src.mk +++ b/src.mk @@ -503,6 +503,7 @@ TEST_MAIN_SOURCES = \ db/version_edit_test.cc \ db/version_set_test.cc \ db/wal_manager_test.cc \ + db/wide/db_wide_basic_test.cc \ db/wide/wide_column_serialization_test.cc \ db/write_batch_test.cc \ db/write_callback_test.cc \ diff --git a/table/block_based/block.cc b/table/block_based/block.cc index ef02bc869c..7eb0b010f2 100644 --- a/table/block_based/block.cc +++ b/table/block_based/block.cc @@ -307,10 +307,11 @@ void MetaBlockIter::SeekImpl(const Slice& target) { // target = "seek_user_key @ type | seqno". // // For any type other than kTypeValue, kTypeDeletion, kTypeSingleDeletion, -// or kTypeBlobIndex, this function behaves identically as Seek(). +// kTypeBlobIndex, or kTypeWideColumnEntity, this function behaves identically +// to Seek(). // // For any type in kTypeValue, kTypeDeletion, kTypeSingleDeletion, -// or kTypeBlobIndex: +// kTypeBlobIndex, or kTypeWideColumnEntity: // // If the return value is FALSE, iter location is undefined, and it means: // 1) there is no key in this block falling into the range: @@ -412,7 +413,8 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) { if (value_type != ValueType::kTypeValue && value_type != ValueType::kTypeDeletion && value_type != ValueType::kTypeSingleDeletion && - value_type != ValueType::kTypeBlobIndex) { + value_type != ValueType::kTypeBlobIndex && + value_type != ValueType::kTypeWideColumnEntity) { SeekImpl(target); return true; } diff --git a/table/get_context.cc b/table/get_context.cc index a8163f19d3..b0847ea5d3 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -241,7 +241,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, auto type = parsed_key.type; // Key matches. Process it - if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && + if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || + type == kTypeWideColumnEntity) && max_covering_tombstone_seq_ != nullptr && *max_covering_tombstone_seq_ > parsed_key.sequence) { type = kTypeRangeDeletion; @@ -249,15 +250,24 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, switch (type) { case kTypeValue: case kTypeBlobIndex: + case kTypeWideColumnEntity: assert(state_ == kNotFound || state_ == kMerge); - if (type == kTypeBlobIndex && is_blob_index_ == nullptr) { - // Blob value not supported. Stop. - state_ = kUnexpectedBlobIndex; + if (type == kTypeBlobIndex) { + if (is_blob_index_ == nullptr) { + // Blob value not supported. Stop. + state_ = kUnexpectedBlobIndex; + return false; + } + } else if (type == kTypeWideColumnEntity) { + // TODO: support wide-column entities + state_ = kUnexpectedWideColumnEntity; return false; } + if (is_blob_index_ != nullptr) { *is_blob_index_ = (type == kTypeBlobIndex); } + if (kNotFound == state_) { state_ = kFound; if (do_merge_) { @@ -276,20 +286,25 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, // It means this function is called as part of DB GetMergeOperands // API and the current value should be part of // merge_context_->operand_list - if (is_blob_index_ != nullptr && *is_blob_index_) { + if (type == kTypeBlobIndex) { PinnableSlice pin_val; if (GetBlobValue(value, &pin_val) == false) { return false; } Slice blob_value(pin_val); push_operand(blob_value, nullptr); + } else if (type == kTypeWideColumnEntity) { + // TODO: support wide-column entities + state_ = kUnexpectedWideColumnEntity; + return false; } else { + assert(type == kTypeValue); push_operand(value, value_pinner); } } } else if (kMerge == state_) { assert(merge_operator_ != nullptr); - if (is_blob_index_ != nullptr && *is_blob_index_) { + if (type == kTypeBlobIndex) { PinnableSlice pin_val; if (GetBlobValue(value, &pin_val) == false) { return false; @@ -304,7 +319,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, // merge_context_->operand_list push_operand(blob_value, nullptr); } + } else if (type == kTypeWideColumnEntity) { + // TODO: support wide-column entities + state_ = kUnexpectedWideColumnEntity; + return false; } else { + assert(type == kTypeValue); + state_ = kFound; if (do_merge_) { Merge(&value); diff --git a/table/get_context.h b/table/get_context.h index 636af6e619..31157c4e37 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -74,6 +74,8 @@ class GetContext { kCorrupt, kMerge, // saver contains the current merge result (the operands) kUnexpectedBlobIndex, + // TODO: remove once wide-column entities are supported by Get/MultiGet + kUnexpectedWideColumnEntity, }; GetContextStats get_context_stats_;