diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index dbf6e46a23..b1ea2e33c8 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -4011,8 +4011,7 @@ std::unique_ptr DBImpl::NewMultiCfIterator( cfh_iter_pairs; cfh_iter_pairs.reserve(column_families.size()); for (size_t i = 0; i < column_families.size(); ++i) { - cfh_iter_pairs.emplace_back(column_families[i], - std::unique_ptr(child_iterators[i])); + cfh_iter_pairs.emplace_back(column_families[i], child_iterators[i]); } return std::make_unique(_read_options, diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 4a52d1d6c9..1fa7156323 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -477,6 +477,21 @@ class Transaction { virtual Iterator* GetIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) = 0; + // Returns a multi-column-family attribute group iterator for the given column + // families that includes both keys in the DB and uncommitted keys in this + // transaction. + // + // Setting read_options.snapshot will affect what is read from the + // DB but will NOT change which keys are read from this transaction (the keys + // in this transaction do not yet belong to any snapshot and will be fetched + // regardless). + // + // The returned iterator is only valid until Commit(), Rollback(), or + // RollbackToSavePoint() is called. + virtual std::unique_ptr GetAttributeGroupIterator( + const ReadOptions& read_options, + const std::vector& column_families) = 0; + // Put, PutEntity, Merge, Delete, and SingleDelete behave similarly to the // corresponding functions in WriteBatch, but will also do conflict checking // on the keys being written. diff --git a/unreleased_history/public_api_changes/txn_get_attribute_group_iterator.md b/unreleased_history/public_api_changes/txn_get_attribute_group_iterator.md new file mode 100644 index 0000000000..2dad16d679 --- /dev/null +++ b/unreleased_history/public_api_changes/txn_get_attribute_group_iterator.md @@ -0,0 +1 @@ +Added a new API `Transaction::GetAttributeGroupIterator` that can be used to create a multi-column-family attribute group iterator over the specified column families, including the data from both the transaction and the underlying database. This API is currently supported for optimistic and write-committed pessimistic transactions. diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 833f03ce91..4d1c52df35 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -2222,6 +2222,241 @@ TEST_P(OptimisticTransactionTest, EntityReadSanityChecks) { } } +TEST_P(OptimisticTransactionTest, AttributeGroupIterator) { + ColumnFamilyOptions cf_opts; + cf_opts.enable_blob_files = true; + + ColumnFamilyHandle* cfh1 = nullptr; + ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf1", &cfh1)); + std::unique_ptr cfh1_guard(cfh1); + + ColumnFamilyHandle* cfh2 = nullptr; + ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf2", &cfh2)); + std::unique_ptr cfh2_guard(cfh2); + + // Note: "cf1" keys are present only in CF1; "cf2" keys are only present in + // CF2; "cf12" keys are present in both CFs. "a" keys are present only in the + // database; "b" keys are present only in the transaction; "c" keys are + // present in both the database and the transaction. The values indicate the + // column family as well as whether the entry came from the database or the + // transaction. + + ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1")); + + ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2")); + + ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2")); + + ASSERT_OK(txn_db->Flush(FlushOptions(), cfh1)); + ASSERT_OK(txn_db->Flush(FlushOptions(), cfh2)); + + std::unique_ptr txn(txn_db->BeginTransaction(WriteOptions())); + + ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1")); + ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1")); + + ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2")); + ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2")); + + ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1")); + ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2")); + ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1")); + ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2")); + + auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) { + ReadOptions read_options; + read_options.allow_unprepared_value = allow_unprepared_value; + + std::unique_ptr iter( + txn->GetAttributeGroupIterator(read_options, {cfh1, cfh2})); + + { + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_a"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_a_db_cf1"}}; + WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_a_db_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}, + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_b"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_b_txn_cf1"}}; + WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_b_txn_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}, + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_c"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_c_txn_cf1"}}; + WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_c_txn_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}, + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_a"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_a_db_cf1"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_b"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_b_txn_cf1"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_c"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_c_txn_cf1"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_a"); + + prepare_if_needed(iter.get()); + + WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_a_db_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_b"); + + prepare_if_needed(iter.get()); + + WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_b_txn_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_c"); + + prepare_if_needed(iter.get()); + + WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_c_txn_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + }; + + verify(/* allow_unprepared_value */ false, [](AttributeGroupIterator*) {}); + verify(/* allow_unprepared_value */ true, [](AttributeGroupIterator* iter) { + ASSERT_TRUE(iter->attribute_groups().empty()); + ASSERT_TRUE(iter->PrepareValue()); + }); +} + +TEST_P(OptimisticTransactionTest, AttributeGroupIteratorSanityChecks) { + ColumnFamilyOptions cf1_opts; + ColumnFamilyHandle* cfh1 = nullptr; + ASSERT_OK(txn_db->CreateColumnFamily(cf1_opts, "cf1", &cfh1)); + std::unique_ptr cfh1_guard(cfh1); + + ColumnFamilyOptions cf2_opts; + cf2_opts.comparator = ReverseBytewiseComparator(); + ColumnFamilyHandle* cfh2 = nullptr; + ASSERT_OK(txn_db->CreateColumnFamily(cf2_opts, "cf2", &cfh2)); + std::unique_ptr cfh2_guard(cfh2); + + std::unique_ptr txn(txn_db->BeginTransaction(WriteOptions())); + + { + std::unique_ptr iter( + txn->GetAttributeGroupIterator(ReadOptions(), {})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + { + std::unique_ptr iter( + txn->GetAttributeGroupIterator(ReadOptions(), {cfh1, cfh2})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + { + ReadOptions read_options; + read_options.io_activity = Env::IOActivity::kCompaction; + + std::unique_ptr iter( + txn->GetAttributeGroupIterator(read_options, {cfh1})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } +} + INSTANTIATE_TEST_CASE_P( InstanceOccGroup, OptimisticTransactionTest, testing::Values(OccValidationPolicy::kValidateSerial, diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 5b1498bbae..3fe511ddc8 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -7,6 +7,7 @@ #include +#include "db/attribute_group_iterator_impl.h" #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "logging/logging.h" @@ -486,6 +487,64 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options, &read_options); } +template +std::unique_ptr TransactionBaseImpl::NewMultiCfIterator( + const ReadOptions& read_options, + const std::vector& column_families, + ErrorIteratorFuncType error_iterator_func) { + if (column_families.empty()) { + return error_iterator_func( + Status::InvalidArgument("No Column Family was provided")); + } + + const Comparator* const first_comparator = + column_families[0]->GetComparator(); + assert(first_comparator); + + for (size_t i = 1; i < column_families.size(); ++i) { + const Comparator* cf_comparator = column_families[i]->GetComparator(); + assert(cf_comparator); + + if (first_comparator != cf_comparator && + first_comparator->GetId() != cf_comparator->GetId()) { + return error_iterator_func(Status::InvalidArgument( + "Different comparators are being used across CFs")); + } + } + + std::vector child_iterators; + const Status s = + db_->NewIterators(read_options, column_families, &child_iterators); + if (!s.ok()) { + return error_iterator_func(s); + } + + assert(column_families.size() == child_iterators.size()); + + std::vector>> + cfh_iter_pairs; + cfh_iter_pairs.reserve(column_families.size()); + for (size_t i = 0; i < column_families.size(); ++i) { + cfh_iter_pairs.emplace_back( + column_families[i], + write_batch_.NewIteratorWithBase(column_families[i], child_iterators[i], + &read_options)); + } + + return std::make_unique(read_options, + column_families[0]->GetComparator(), + std::move(cfh_iter_pairs)); +} + +std::unique_ptr +TransactionBaseImpl::GetAttributeGroupIterator( + const ReadOptions& read_options, + const std::vector& column_families) { + return NewMultiCfIterator( + read_options, column_families, + [](const Status& s) { return NewAttributeGroupErrorIterator(s); }); +} + Status TransactionBaseImpl::PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key, const WideColumns& columns, diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 631562eb35..57da3417b0 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -146,6 +146,10 @@ class TransactionBaseImpl : public Transaction { Iterator* GetIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) override; + std::unique_ptr GetAttributeGroupIterator( + const ReadOptions& read_options, + const std::vector& column_families) override; + Status Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value, const bool assume_tracked = false) override; Status Put(const Slice& key, const Slice& value) override { @@ -304,6 +308,13 @@ class TransactionBaseImpl : public Transaction { LockTracker& GetTrackedLocks() { return *tracked_locks_; } protected: + template + std::unique_ptr NewMultiCfIterator( + const ReadOptions& read_options, + const std::vector& column_families, + ErrorIteratorFuncType error_iterator_func); + Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) override; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 9462960224..59937b2af8 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -13,6 +13,7 @@ #include "db/db_impl/db_impl.h" #include "port/port.h" +#include "rocksdb/attribute_groups.h" #include "rocksdb/db.h" #include "rocksdb/options.h" #include "rocksdb/perf_context.h" @@ -7464,6 +7465,273 @@ TEST_P(TransactionTest, PutEntityRecovery) { } } +TEST_P(TransactionTest, AttributeGroupIterator) { + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { + ROCKSDB_GTEST_BYPASS("Test only WriteCommitted for now"); + return; + } + + ColumnFamilyOptions cf_opts; + cf_opts.enable_blob_files = true; + + ColumnFamilyHandle* cfh1 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf_opts, "cf1", &cfh1)); + std::unique_ptr cfh1_guard(cfh1); + + ColumnFamilyHandle* cfh2 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf_opts, "cf2", &cfh2)); + std::unique_ptr cfh2_guard(cfh2); + + // Note: "cf1" keys are present only in CF1; "cf2" keys are only present in + // CF2; "cf12" keys are present in both CFs. "a" keys are present only in the + // database; "b" keys are present only in the transaction; "c" keys are + // present in both the database and the transaction. The values indicate the + // column family as well as whether the entry came from the database or the + // transaction. + + ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1")); + ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1")); + + ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2")); + ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2")); + + ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1")); + ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2")); + ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1")); + ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2")); + + ASSERT_OK(db->Flush(FlushOptions(), cfh1)); + ASSERT_OK(db->Flush(FlushOptions(), cfh2)); + + std::unique_ptr txn(db->BeginTransaction(WriteOptions())); + + ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1")); + ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1")); + + ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2")); + ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2")); + + ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1")); + ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2")); + ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1")); + ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2")); + + auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) { + ReadOptions read_options; + read_options.allow_unprepared_value = allow_unprepared_value; + + std::unique_ptr iter( + txn->GetAttributeGroupIterator(read_options, {cfh1, cfh2})); + + { + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_a"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_a_db_cf1"}}; + WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_a_db_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}, + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_b"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_b_txn_cf1"}}; + WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_b_txn_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}, + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_c"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_c_txn_cf1"}}; + WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_c_txn_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}, + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_a"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_a_db_cf1"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_b"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_b_txn_cf1"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_c"); + + prepare_if_needed(iter.get()); + + WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_c_txn_cf1"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh1, &cf1_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_a"); + + prepare_if_needed(iter.get()); + + WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_a_db_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_b"); + + prepare_if_needed(iter.get()); + + WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_b_txn_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_c"); + + prepare_if_needed(iter.get()); + + WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_c_txn_cf2"}}; + IteratorAttributeGroups expected{ + IteratorAttributeGroup{cfh2, &cf2_columns}}; + ASSERT_EQ(iter->attribute_groups(), expected); + } + + { + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + }; + + verify(/* allow_unprepared_value */ false, [](AttributeGroupIterator*) {}); + verify(/* allow_unprepared_value */ true, [](AttributeGroupIterator* iter) { + ASSERT_TRUE(iter->attribute_groups().empty()); + ASSERT_TRUE(iter->PrepareValue()); + }); +} + +TEST_P(TransactionTest, AttributeGroupIteratorSanityChecks) { + ColumnFamilyOptions cf1_opts; + ColumnFamilyHandle* cfh1 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf1_opts, "cf1", &cfh1)); + std::unique_ptr cfh1_guard(cfh1); + + ColumnFamilyOptions cf2_opts; + cf2_opts.comparator = ReverseBytewiseComparator(); + ColumnFamilyHandle* cfh2 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf2_opts, "cf2", &cfh2)); + std::unique_ptr cfh2_guard(cfh2); + + std::unique_ptr txn(db->BeginTransaction(WriteOptions())); + + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { + { + std::unique_ptr iter( + txn->GetAttributeGroupIterator(ReadOptions(), {})); + ASSERT_TRUE(iter->status().IsNotSupported()); + } + + { + std::unique_ptr iter( + txn->GetAttributeGroupIterator(ReadOptions(), {cfh1, cfh2})); + ASSERT_TRUE(iter->status().IsNotSupported()); + } + + { + ReadOptions read_options; + read_options.io_activity = Env::IOActivity::kCompaction; + + std::unique_ptr iter( + txn->GetAttributeGroupIterator(read_options, {cfh1})); + ASSERT_TRUE(iter->status().IsNotSupported()); + } + + return; + } + + { + std::unique_ptr iter( + txn->GetAttributeGroupIterator(ReadOptions(), {})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + { + std::unique_ptr iter( + txn->GetAttributeGroupIterator(ReadOptions(), {cfh1, cfh2})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + { + ReadOptions read_options; + read_options.io_activity = Env::IOActivity::kCompaction; + + std::unique_ptr iter( + txn->GetAttributeGroupIterator(read_options, {cfh1})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } +} + TEST_F(TransactionDBTest, CollapseKey) { ASSERT_OK(ReOpen()); ASSERT_OK(db->Put({}, "hello", "world")); diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index d4b9d14d3a..6af60b44ec 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -3,13 +3,13 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). - #include "utilities/transactions/write_prepared_txn.h" #include #include #include +#include "db/attribute_group_iterator_impl.h" #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "rocksdb/db.h" @@ -135,6 +135,15 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, return write_batch_.NewIteratorWithBase(column_family, db_iter, &options); } +std::unique_ptr +WritePreparedTxn::GetAttributeGroupIterator( + const ReadOptions& /* read_options */, + const std::vector& /* column_families */) { + return NewAttributeGroupErrorIterator( + Status::NotSupported("GetAttributeGroupIterator not supported for " + "write-prepared/write-unprepared transactions")); +} + Status WritePreparedTxn::PrepareInternal() { WriteOptions write_options = write_options_; write_options.disableWAL = false; diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 24e667f78a..b5ff8b8497 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -5,7 +5,6 @@ #pragma once - #include #include #include @@ -71,6 +70,10 @@ class WritePreparedTxn : public PessimisticTransaction { Iterator* GetIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) override; + std::unique_ptr GetAttributeGroupIterator( + const ReadOptions& read_options, + const std::vector& column_families) override; + void SetSnapshot() override; protected: