diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 1fa7156323..6c444ac26d 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -477,6 +477,21 @@ class Transaction { virtual Iterator* GetIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) = 0; + // Returns a multi-column-family coalescing iterator for the given column + // families that includes both keys in the DB and uncommitted keys in this + // transaction. + // + // Setting read_options.snapshot will affect what is read from the + // DB but will NOT change which keys are read from this transaction (the keys + // in this transaction do not yet belong to any snapshot and will be fetched + // regardless). + // + // The returned iterator is only valid until Commit(), Rollback(), or + // RollbackToSavePoint() is called. + virtual std::unique_ptr GetCoalescingIterator( + const ReadOptions& read_options, + const std::vector& column_families) = 0; + // Returns a multi-column-family attribute group iterator for the given column // families that includes both keys in the DB and uncommitted keys in this // transaction. diff --git a/unreleased_history/public_api_changes/txn_get_coalescing_iterator.md b/unreleased_history/public_api_changes/txn_get_coalescing_iterator.md new file mode 100644 index 0000000000..45dafd3025 --- /dev/null +++ b/unreleased_history/public_api_changes/txn_get_coalescing_iterator.md @@ -0,0 +1 @@ +Added a new API `Transaction::GetCoalescingIterator` that can be used to create a multi-column-family coalescing iterator over the specified column families, including the data from both the transaction and the underlying database. This API is currently supported for optimistic and write-committed pessimistic transactions. diff --git a/utilities/transactions/optimistic_transaction_test.cc b/utilities/transactions/optimistic_transaction_test.cc index 4d1c52df35..81def3902a 100644 --- a/utilities/transactions/optimistic_transaction_test.cc +++ b/utilities/transactions/optimistic_transaction_test.cc @@ -2222,6 +2222,208 @@ TEST_P(OptimisticTransactionTest, EntityReadSanityChecks) { } } +TEST_P(OptimisticTransactionTest, CoalescingIterator) { + ColumnFamilyOptions cf_opts; + cf_opts.enable_blob_files = true; + + ColumnFamilyHandle* cfh1 = nullptr; + ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf1", &cfh1)); + std::unique_ptr cfh1_guard(cfh1); + + ColumnFamilyHandle* cfh2 = nullptr; + ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf2", &cfh2)); + std::unique_ptr cfh2_guard(cfh2); + + // Note: "cf1" keys are present only in CF1; "cf2" keys are only present in + // CF2; "cf12" keys are present in both CFs. "a" keys are present only in the + // database; "b" keys are present only in the transaction; "c" keys are + // present in both the database and the transaction. The values indicate the + // column family as well as whether the entry came from the database or the + // transaction. + + ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1")); + + ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2")); + + ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1")); + ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2")); + + ASSERT_OK(txn_db->Flush(FlushOptions(), cfh1)); + ASSERT_OK(txn_db->Flush(FlushOptions(), cfh2)); + + std::unique_ptr txn(txn_db->BeginTransaction(WriteOptions())); + + ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1")); + ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1")); + + ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2")); + ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2")); + + ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1")); + ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2")); + ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1")); + ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2")); + + auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) { + ReadOptions read_options; + read_options.allow_unprepared_value = allow_unprepared_value; + + std::unique_ptr iter( + txn->GetCoalescingIterator(read_options, {cfh1, cfh2})); + + { + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_a"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf12_a_db_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_b"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf12_b_txn_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_c"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf12_c_txn_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_a"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf1_a_db_cf1"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_b"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf1_b_txn_cf1"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_c"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf1_c_txn_cf1"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_a"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf2_a_db_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_b"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf2_b_txn_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_c"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf2_c_txn_cf2"); + } + + { + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + }; + + verify(/* allow_unprepared_value */ false, [](Iterator*) {}); + verify(/* allow_unprepared_value */ true, [](Iterator* iter) { + ASSERT_TRUE(iter->value().empty()); + ASSERT_TRUE(iter->PrepareValue()); + }); +} + +TEST_P(OptimisticTransactionTest, CoalescingIteratorSanityChecks) { + ColumnFamilyOptions cf1_opts; + ColumnFamilyHandle* cfh1 = nullptr; + ASSERT_OK(txn_db->CreateColumnFamily(cf1_opts, "cf1", &cfh1)); + std::unique_ptr cfh1_guard(cfh1); + + ColumnFamilyOptions cf2_opts; + cf2_opts.comparator = ReverseBytewiseComparator(); + ColumnFamilyHandle* cfh2 = nullptr; + ASSERT_OK(txn_db->CreateColumnFamily(cf2_opts, "cf2", &cfh2)); + std::unique_ptr cfh2_guard(cfh2); + + std::unique_ptr txn(txn_db->BeginTransaction(WriteOptions())); + + { + std::unique_ptr iter( + txn->GetCoalescingIterator(ReadOptions(), {})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + { + std::unique_ptr iter( + txn->GetCoalescingIterator(ReadOptions(), {cfh1, cfh2})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + { + ReadOptions read_options; + read_options.io_activity = Env::IOActivity::kCompaction; + + std::unique_ptr iter( + txn->GetCoalescingIterator(read_options, {cfh1})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } +} + TEST_P(OptimisticTransactionTest, AttributeGroupIterator) { ColumnFamilyOptions cf_opts; cf_opts.enable_blob_files = true; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 3fe511ddc8..063cc02b44 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -8,6 +8,7 @@ #include #include "db/attribute_group_iterator_impl.h" +#include "db/coalescing_iterator.h" #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "logging/logging.h" @@ -536,6 +537,15 @@ std::unique_ptr TransactionBaseImpl::NewMultiCfIterator( std::move(cfh_iter_pairs)); } +std::unique_ptr TransactionBaseImpl::GetCoalescingIterator( + const ReadOptions& read_options, + const std::vector& column_families) { + return NewMultiCfIterator( + read_options, column_families, [](const Status& s) { + return std::unique_ptr(NewErrorIterator(s)); + }); +} + std::unique_ptr TransactionBaseImpl::GetAttributeGroupIterator( const ReadOptions& read_options, diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 57da3417b0..888ea17446 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -146,6 +146,10 @@ class TransactionBaseImpl : public Transaction { Iterator* GetIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) override; + std::unique_ptr GetCoalescingIterator( + const ReadOptions& read_options, + const std::vector& column_families) override; + std::unique_ptr GetAttributeGroupIterator( const ReadOptions& read_options, const std::vector& column_families) override; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 59937b2af8..37c11874aa 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -7465,6 +7465,240 @@ TEST_P(TransactionTest, PutEntityRecovery) { } } +TEST_P(TransactionTest, CoalescingIterator) { + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { + ROCKSDB_GTEST_BYPASS("Test only WriteCommitted for now"); + return; + } + + ColumnFamilyOptions cf_opts; + cf_opts.enable_blob_files = true; + + ColumnFamilyHandle* cfh1 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf_opts, "cf1", &cfh1)); + std::unique_ptr cfh1_guard(cfh1); + + ColumnFamilyHandle* cfh2 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf_opts, "cf2", &cfh2)); + std::unique_ptr cfh2_guard(cfh2); + + // Note: "cf1" keys are present only in CF1; "cf2" keys are only present in + // CF2; "cf12" keys are present in both CFs. "a" keys are present only in the + // database; "b" keys are present only in the transaction; "c" keys are + // present in both the database and the transaction. The values indicate the + // column family as well as whether the entry came from the database or the + // transaction. + + ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1")); + ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1")); + + ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2")); + ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2")); + + ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1")); + ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2")); + ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1")); + ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2")); + + ASSERT_OK(db->Flush(FlushOptions(), cfh1)); + ASSERT_OK(db->Flush(FlushOptions(), cfh2)); + + std::unique_ptr txn(db->BeginTransaction(WriteOptions())); + + ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1")); + ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1")); + + ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2")); + ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2")); + + ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1")); + ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2")); + ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1")); + ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2")); + + auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) { + ReadOptions read_options; + read_options.allow_unprepared_value = allow_unprepared_value; + + std::unique_ptr iter( + txn->GetCoalescingIterator(read_options, {cfh1, cfh2})); + + { + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_a"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf12_a_db_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_b"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf12_b_txn_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf12_c"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf12_c_txn_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_a"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf1_a_db_cf1"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_b"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf1_b_txn_cf1"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf1_c"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf1_c_txn_cf1"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_a"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf2_a_db_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_b"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf2_b_txn_cf2"); + } + + { + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), "cf2_c"); + + prepare_if_needed(iter.get()); + + ASSERT_EQ(iter->value(), "cf2_c_txn_cf2"); + } + + { + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + } + }; + + verify(/* allow_unprepared_value */ false, [](Iterator*) {}); + verify(/* allow_unprepared_value */ true, [](Iterator* iter) { + ASSERT_TRUE(iter->value().empty()); + ASSERT_TRUE(iter->PrepareValue()); + }); +} + +TEST_P(TransactionTest, CoalescingIteratorSanityChecks) { + ColumnFamilyOptions cf1_opts; + ColumnFamilyHandle* cfh1 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf1_opts, "cf1", &cfh1)); + std::unique_ptr cfh1_guard(cfh1); + + ColumnFamilyOptions cf2_opts; + cf2_opts.comparator = ReverseBytewiseComparator(); + ColumnFamilyHandle* cfh2 = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf2_opts, "cf2", &cfh2)); + std::unique_ptr cfh2_guard(cfh2); + + std::unique_ptr txn(db->BeginTransaction(WriteOptions())); + + const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); + if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { + { + std::unique_ptr iter( + txn->GetCoalescingIterator(ReadOptions(), {})); + ASSERT_TRUE(iter->status().IsNotSupported()); + } + + { + std::unique_ptr iter( + txn->GetCoalescingIterator(ReadOptions(), {cfh1, cfh2})); + ASSERT_TRUE(iter->status().IsNotSupported()); + } + + { + ReadOptions read_options; + read_options.io_activity = Env::IOActivity::kCompaction; + + std::unique_ptr iter( + txn->GetCoalescingIterator(read_options, {cfh1})); + ASSERT_TRUE(iter->status().IsNotSupported()); + } + + return; + } + + { + std::unique_ptr iter( + txn->GetCoalescingIterator(ReadOptions(), {})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + { + std::unique_ptr iter( + txn->GetCoalescingIterator(ReadOptions(), {cfh1, cfh2})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + { + ReadOptions read_options; + read_options.io_activity = Env::IOActivity::kCompaction; + + std::unique_ptr iter( + txn->GetCoalescingIterator(read_options, {cfh1})); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } +} + TEST_P(TransactionTest, AttributeGroupIterator) { const TxnDBWritePolicy write_policy = std::get<2>(GetParam()); if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) { diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index 6af60b44ec..638bab7cd4 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -135,6 +135,14 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options, return write_batch_.NewIteratorWithBase(column_family, db_iter, &options); } +std::unique_ptr WritePreparedTxn::GetCoalescingIterator( + const ReadOptions& /* read_options */, + const std::vector& /* column_families */) { + return std::unique_ptr(NewErrorIterator( + Status::NotSupported("GetCoalescingIterator not supported for " + "write-prepared/write-unprepared transactions"))); +} + std::unique_ptr WritePreparedTxn::GetAttributeGroupIterator( const ReadOptions& /* read_options */, diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index b5ff8b8497..aca6a19ea0 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -70,6 +70,10 @@ class WritePreparedTxn : public PessimisticTransaction { Iterator* GetIterator(const ReadOptions& options, ColumnFamilyHandle* column_family) override; + std::unique_ptr GetCoalescingIterator( + const ReadOptions& read_options, + const std::vector& column_families) override; + std::unique_ptr GetAttributeGroupIterator( const ReadOptions& read_options, const std::vector& column_families) override;