Add a new API Transaction::GetCoalescingIterator (#13128)

Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13128

Similarly to https://github.com/facebook/rocksdb/pull/13119, the patch adds a new API `Transaction::GetCoalescingIterator` that can be used to create a multi-column-family coalescing iterator over the specified column families, including the data from both the transaction and the underlying database. This API is currently supported for optimistic and write-committed pessimistic transactions.

Reviewed By: jowlyzhang

Differential Revision: D65682389

fbshipit-source-id: faf5dd1de9bce9d403fc34246ecab4c55572a228
This commit is contained in:
Levi Tamasi 2024-11-08 14:14:39 -08:00 committed by Facebook GitHub Bot
parent ee258619be
commit 9b95fbbf24
8 changed files with 478 additions and 0 deletions

View File

@ -477,6 +477,21 @@ class Transaction {
virtual Iterator* GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) = 0;
// Returns a multi-column-family coalescing iterator for the given column
// families that includes both keys in the DB and uncommitted keys in this
// transaction.
//
// Setting read_options.snapshot will affect what is read from the
// DB but will NOT change which keys are read from this transaction (the keys
// in this transaction do not yet belong to any snapshot and will be fetched
// regardless).
//
// The returned iterator is only valid until Commit(), Rollback(), or
// RollbackToSavePoint() is called.
virtual std::unique_ptr<Iterator> GetCoalescingIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families) = 0;
// Returns a multi-column-family attribute group iterator for the given column
// families that includes both keys in the DB and uncommitted keys in this
// transaction.

View File

@ -0,0 +1 @@
Added a new API `Transaction::GetCoalescingIterator` that can be used to create a multi-column-family coalescing iterator over the specified column families, including the data from both the transaction and the underlying database. This API is currently supported for optimistic and write-committed pessimistic transactions.

View File

@ -2222,6 +2222,208 @@ TEST_P(OptimisticTransactionTest, EntityReadSanityChecks) {
}
}
TEST_P(OptimisticTransactionTest, CoalescingIterator) {
ColumnFamilyOptions cf_opts;
cf_opts.enable_blob_files = true;
ColumnFamilyHandle* cfh1 = nullptr;
ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf1", &cfh1));
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
ColumnFamilyHandle* cfh2 = nullptr;
ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf2", &cfh2));
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
// Note: "cf1" keys are present only in CF1; "cf2" keys are only present in
// CF2; "cf12" keys are present in both CFs. "a" keys are present only in the
// database; "b" keys are present only in the transaction; "c" keys are
// present in both the database and the transaction. The values indicate the
// column family as well as whether the entry came from the database or the
// transaction.
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1"));
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2"));
ASSERT_OK(txn_db->Flush(FlushOptions(), cfh1));
ASSERT_OK(txn_db->Flush(FlushOptions(), cfh2));
std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1"));
ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1"));
ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2"));
ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2"));
ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1"));
ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2"));
ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1"));
ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2"));
auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) {
ReadOptions read_options;
read_options.allow_unprepared_value = allow_unprepared_value;
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(read_options, {cfh1, cfh2}));
{
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf12_a");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf12_a_db_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf12_b");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf12_b_txn_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf12_c");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf12_c_txn_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf1_a");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf1_a_db_cf1");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf1_b");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf1_b_txn_cf1");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf1_c");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf1_c_txn_cf1");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf2_a");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf2_a_db_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf2_b");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf2_b_txn_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf2_c");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf2_c_txn_cf2");
}
{
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
};
verify(/* allow_unprepared_value */ false, [](Iterator*) {});
verify(/* allow_unprepared_value */ true, [](Iterator* iter) {
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
});
}
TEST_P(OptimisticTransactionTest, CoalescingIteratorSanityChecks) {
ColumnFamilyOptions cf1_opts;
ColumnFamilyHandle* cfh1 = nullptr;
ASSERT_OK(txn_db->CreateColumnFamily(cf1_opts, "cf1", &cfh1));
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
ColumnFamilyOptions cf2_opts;
cf2_opts.comparator = ReverseBytewiseComparator();
ColumnFamilyHandle* cfh2 = nullptr;
ASSERT_OK(txn_db->CreateColumnFamily(cf2_opts, "cf2", &cfh2));
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
{
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(ReadOptions(), {}));
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
{
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(ReadOptions(), {cfh1, cfh2}));
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
{
ReadOptions read_options;
read_options.io_activity = Env::IOActivity::kCompaction;
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(read_options, {cfh1}));
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
}
TEST_P(OptimisticTransactionTest, AttributeGroupIterator) {
ColumnFamilyOptions cf_opts;
cf_opts.enable_blob_files = true;

View File

@ -8,6 +8,7 @@
#include <cinttypes>
#include "db/attribute_group_iterator_impl.h"
#include "db/coalescing_iterator.h"
#include "db/column_family.h"
#include "db/db_impl/db_impl.h"
#include "logging/logging.h"
@ -536,6 +537,15 @@ std::unique_ptr<IterType> TransactionBaseImpl::NewMultiCfIterator(
std::move(cfh_iter_pairs));
}
std::unique_ptr<Iterator> TransactionBaseImpl::GetCoalescingIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
return NewMultiCfIterator<Iterator, CoalescingIterator>(
read_options, column_families, [](const Status& s) {
return std::unique_ptr<Iterator>(NewErrorIterator(s));
});
}
std::unique_ptr<AttributeGroupIterator>
TransactionBaseImpl::GetAttributeGroupIterator(
const ReadOptions& read_options,

View File

@ -146,6 +146,10 @@ class TransactionBaseImpl : public Transaction {
Iterator* GetIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) override;
std::unique_ptr<Iterator> GetCoalescingIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families) override;
std::unique_ptr<AttributeGroupIterator> GetAttributeGroupIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families) override;

View File

@ -7465,6 +7465,240 @@ TEST_P(TransactionTest, PutEntityRecovery) {
}
}
TEST_P(TransactionTest, CoalescingIterator) {
const TxnDBWritePolicy write_policy = std::get<2>(GetParam());
if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) {
ROCKSDB_GTEST_BYPASS("Test only WriteCommitted for now");
return;
}
ColumnFamilyOptions cf_opts;
cf_opts.enable_blob_files = true;
ColumnFamilyHandle* cfh1 = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_opts, "cf1", &cfh1));
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
ColumnFamilyHandle* cfh2 = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_opts, "cf2", &cfh2));
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
// Note: "cf1" keys are present only in CF1; "cf2" keys are only present in
// CF2; "cf12" keys are present in both CFs. "a" keys are present only in the
// database; "b" keys are present only in the transaction; "c" keys are
// present in both the database and the transaction. The values indicate the
// column family as well as whether the entry came from the database or the
// transaction.
ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1"));
ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1"));
ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2"));
ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2"));
ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1"));
ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2"));
ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1"));
ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2"));
ASSERT_OK(db->Flush(FlushOptions(), cfh1));
ASSERT_OK(db->Flush(FlushOptions(), cfh2));
std::unique_ptr<Transaction> txn(db->BeginTransaction(WriteOptions()));
ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1"));
ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1"));
ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2"));
ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2"));
ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1"));
ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2"));
ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1"));
ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2"));
auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) {
ReadOptions read_options;
read_options.allow_unprepared_value = allow_unprepared_value;
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(read_options, {cfh1, cfh2}));
{
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf12_a");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf12_a_db_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf12_b");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf12_b_txn_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf12_c");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf12_c_txn_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf1_a");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf1_a_db_cf1");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf1_b");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf1_b_txn_cf1");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf1_c");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf1_c_txn_cf1");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf2_a");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf2_a_db_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf2_b");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf2_b_txn_cf2");
}
{
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key(), "cf2_c");
prepare_if_needed(iter.get());
ASSERT_EQ(iter->value(), "cf2_c_txn_cf2");
}
{
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
};
verify(/* allow_unprepared_value */ false, [](Iterator*) {});
verify(/* allow_unprepared_value */ true, [](Iterator* iter) {
ASSERT_TRUE(iter->value().empty());
ASSERT_TRUE(iter->PrepareValue());
});
}
TEST_P(TransactionTest, CoalescingIteratorSanityChecks) {
ColumnFamilyOptions cf1_opts;
ColumnFamilyHandle* cfh1 = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf1_opts, "cf1", &cfh1));
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
ColumnFamilyOptions cf2_opts;
cf2_opts.comparator = ReverseBytewiseComparator();
ColumnFamilyHandle* cfh2 = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf2_opts, "cf2", &cfh2));
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
std::unique_ptr<Transaction> txn(db->BeginTransaction(WriteOptions()));
const TxnDBWritePolicy write_policy = std::get<2>(GetParam());
if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) {
{
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(ReadOptions(), {}));
ASSERT_TRUE(iter->status().IsNotSupported());
}
{
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(ReadOptions(), {cfh1, cfh2}));
ASSERT_TRUE(iter->status().IsNotSupported());
}
{
ReadOptions read_options;
read_options.io_activity = Env::IOActivity::kCompaction;
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(read_options, {cfh1}));
ASSERT_TRUE(iter->status().IsNotSupported());
}
return;
}
{
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(ReadOptions(), {}));
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
{
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(ReadOptions(), {cfh1, cfh2}));
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
{
ReadOptions read_options;
read_options.io_activity = Env::IOActivity::kCompaction;
std::unique_ptr<Iterator> iter(
txn->GetCoalescingIterator(read_options, {cfh1}));
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
}
TEST_P(TransactionTest, AttributeGroupIterator) {
const TxnDBWritePolicy write_policy = std::get<2>(GetParam());
if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) {

View File

@ -135,6 +135,14 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
return write_batch_.NewIteratorWithBase(column_family, db_iter, &options);
}
std::unique_ptr<Iterator> WritePreparedTxn::GetCoalescingIterator(
const ReadOptions& /* read_options */,
const std::vector<ColumnFamilyHandle*>& /* column_families */) {
return std::unique_ptr<Iterator>(NewErrorIterator(
Status::NotSupported("GetCoalescingIterator not supported for "
"write-prepared/write-unprepared transactions")));
}
std::unique_ptr<AttributeGroupIterator>
WritePreparedTxn::GetAttributeGroupIterator(
const ReadOptions& /* read_options */,

View File

@ -70,6 +70,10 @@ class WritePreparedTxn : public PessimisticTransaction {
Iterator* GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) override;
std::unique_ptr<Iterator> GetCoalescingIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families) override;
std::unique_ptr<AttributeGroupIterator> GetAttributeGroupIterator(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families) override;