mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-03 14:52:53 +00:00
Add a new API Transaction::GetAttributeGroupIterator (#13119)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/13119 The patch adds a new API `Transaction::GetAttributeGroupIterator` that can be used to create a multi-column-family attribute group iterator over the specified column families, including the data from both the transaction and the underlying database. This API is currently supported for optimistic and write-committed pessimistic transactions. Reviewed By: jowlyzhang Differential Revision: D65548324 fbshipit-source-id: 0fb8a22129494770fdba3d6024eef72b3e051136
This commit is contained in:
parent
dc34a0ff1e
commit
2ba4dceb4c
|
@ -4011,8 +4011,7 @@ std::unique_ptr<IterType> DBImpl::NewMultiCfIterator(
|
|||
cfh_iter_pairs;
|
||||
cfh_iter_pairs.reserve(column_families.size());
|
||||
for (size_t i = 0; i < column_families.size(); ++i) {
|
||||
cfh_iter_pairs.emplace_back(column_families[i],
|
||||
std::unique_ptr<Iterator>(child_iterators[i]));
|
||||
cfh_iter_pairs.emplace_back(column_families[i], child_iterators[i]);
|
||||
}
|
||||
|
||||
return std::make_unique<ImplType>(_read_options,
|
||||
|
|
|
@ -477,6 +477,21 @@ class Transaction {
|
|||
virtual Iterator* GetIterator(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family) = 0;
|
||||
|
||||
// Returns a multi-column-family attribute group iterator for the given column
|
||||
// families that includes both keys in the DB and uncommitted keys in this
|
||||
// transaction.
|
||||
//
|
||||
// Setting read_options.snapshot will affect what is read from the
|
||||
// DB but will NOT change which keys are read from this transaction (the keys
|
||||
// in this transaction do not yet belong to any snapshot and will be fetched
|
||||
// regardless).
|
||||
//
|
||||
// The returned iterator is only valid until Commit(), Rollback(), or
|
||||
// RollbackToSavePoint() is called.
|
||||
virtual std::unique_ptr<AttributeGroupIterator> GetAttributeGroupIterator(
|
||||
const ReadOptions& read_options,
|
||||
const std::vector<ColumnFamilyHandle*>& column_families) = 0;
|
||||
|
||||
// Put, PutEntity, Merge, Delete, and SingleDelete behave similarly to the
|
||||
// corresponding functions in WriteBatch, but will also do conflict checking
|
||||
// on the keys being written.
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Added a new API `Transaction::GetAttributeGroupIterator` that can be used to create a multi-column-family attribute group iterator over the specified column families, including the data from both the transaction and the underlying database. This API is currently supported for optimistic and write-committed pessimistic transactions.
|
|
@ -2222,6 +2222,241 @@ TEST_P(OptimisticTransactionTest, EntityReadSanityChecks) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_P(OptimisticTransactionTest, AttributeGroupIterator) {
|
||||
ColumnFamilyOptions cf_opts;
|
||||
cf_opts.enable_blob_files = true;
|
||||
|
||||
ColumnFamilyHandle* cfh1 = nullptr;
|
||||
ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf1", &cfh1));
|
||||
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
|
||||
|
||||
ColumnFamilyHandle* cfh2 = nullptr;
|
||||
ASSERT_OK(txn_db->CreateColumnFamily(cf_opts, "cf2", &cfh2));
|
||||
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
|
||||
|
||||
// Note: "cf1" keys are present only in CF1; "cf2" keys are only present in
|
||||
// CF2; "cf12" keys are present in both CFs. "a" keys are present only in the
|
||||
// database; "b" keys are present only in the transaction; "c" keys are
|
||||
// present in both the database and the transaction. The values indicate the
|
||||
// column family as well as whether the entry came from the database or the
|
||||
// transaction.
|
||||
|
||||
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1"));
|
||||
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1"));
|
||||
|
||||
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2"));
|
||||
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2"));
|
||||
|
||||
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1"));
|
||||
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2"));
|
||||
ASSERT_OK(txn_db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1"));
|
||||
ASSERT_OK(txn_db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2"));
|
||||
|
||||
ASSERT_OK(txn_db->Flush(FlushOptions(), cfh1));
|
||||
ASSERT_OK(txn_db->Flush(FlushOptions(), cfh2));
|
||||
|
||||
std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
|
||||
|
||||
ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1"));
|
||||
ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1"));
|
||||
|
||||
ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2"));
|
||||
ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2"));
|
||||
|
||||
ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1"));
|
||||
ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2"));
|
||||
ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1"));
|
||||
ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2"));
|
||||
|
||||
auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) {
|
||||
ReadOptions read_options;
|
||||
read_options.allow_unprepared_value = allow_unprepared_value;
|
||||
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(read_options, {cfh1, cfh2}));
|
||||
|
||||
{
|
||||
iter->SeekToFirst();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf12_a");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_a_db_cf1"}};
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_a_db_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns},
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf12_b");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_b_txn_cf1"}};
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_b_txn_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns},
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf12_c");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_c_txn_cf1"}};
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_c_txn_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns},
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf1_a");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_a_db_cf1"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf1_b");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_b_txn_cf1"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf1_c");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_c_txn_cf1"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf2_a");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_a_db_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf2_b");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_b_txn_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf2_c");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_c_txn_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_FALSE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
}
|
||||
};
|
||||
|
||||
verify(/* allow_unprepared_value */ false, [](AttributeGroupIterator*) {});
|
||||
verify(/* allow_unprepared_value */ true, [](AttributeGroupIterator* iter) {
|
||||
ASSERT_TRUE(iter->attribute_groups().empty());
|
||||
ASSERT_TRUE(iter->PrepareValue());
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(OptimisticTransactionTest, AttributeGroupIteratorSanityChecks) {
|
||||
ColumnFamilyOptions cf1_opts;
|
||||
ColumnFamilyHandle* cfh1 = nullptr;
|
||||
ASSERT_OK(txn_db->CreateColumnFamily(cf1_opts, "cf1", &cfh1));
|
||||
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
|
||||
|
||||
ColumnFamilyOptions cf2_opts;
|
||||
cf2_opts.comparator = ReverseBytewiseComparator();
|
||||
ColumnFamilyHandle* cfh2 = nullptr;
|
||||
ASSERT_OK(txn_db->CreateColumnFamily(cf2_opts, "cf2", &cfh2));
|
||||
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
|
||||
|
||||
std::unique_ptr<Transaction> txn(txn_db->BeginTransaction(WriteOptions()));
|
||||
|
||||
{
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(ReadOptions(), {}));
|
||||
ASSERT_TRUE(iter->status().IsInvalidArgument());
|
||||
}
|
||||
|
||||
{
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(ReadOptions(), {cfh1, cfh2}));
|
||||
ASSERT_TRUE(iter->status().IsInvalidArgument());
|
||||
}
|
||||
|
||||
{
|
||||
ReadOptions read_options;
|
||||
read_options.io_activity = Env::IOActivity::kCompaction;
|
||||
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(read_options, {cfh1}));
|
||||
ASSERT_TRUE(iter->status().IsInvalidArgument());
|
||||
}
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_CASE_P(
|
||||
InstanceOccGroup, OptimisticTransactionTest,
|
||||
testing::Values(OccValidationPolicy::kValidateSerial,
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
#include <cinttypes>
|
||||
|
||||
#include "db/attribute_group_iterator_impl.h"
|
||||
#include "db/column_family.h"
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "logging/logging.h"
|
||||
|
@ -486,6 +487,64 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
|
|||
&read_options);
|
||||
}
|
||||
|
||||
template <typename IterType, typename ImplType, typename ErrorIteratorFuncType>
|
||||
std::unique_ptr<IterType> TransactionBaseImpl::NewMultiCfIterator(
|
||||
const ReadOptions& read_options,
|
||||
const std::vector<ColumnFamilyHandle*>& column_families,
|
||||
ErrorIteratorFuncType error_iterator_func) {
|
||||
if (column_families.empty()) {
|
||||
return error_iterator_func(
|
||||
Status::InvalidArgument("No Column Family was provided"));
|
||||
}
|
||||
|
||||
const Comparator* const first_comparator =
|
||||
column_families[0]->GetComparator();
|
||||
assert(first_comparator);
|
||||
|
||||
for (size_t i = 1; i < column_families.size(); ++i) {
|
||||
const Comparator* cf_comparator = column_families[i]->GetComparator();
|
||||
assert(cf_comparator);
|
||||
|
||||
if (first_comparator != cf_comparator &&
|
||||
first_comparator->GetId() != cf_comparator->GetId()) {
|
||||
return error_iterator_func(Status::InvalidArgument(
|
||||
"Different comparators are being used across CFs"));
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<Iterator*> child_iterators;
|
||||
const Status s =
|
||||
db_->NewIterators(read_options, column_families, &child_iterators);
|
||||
if (!s.ok()) {
|
||||
return error_iterator_func(s);
|
||||
}
|
||||
|
||||
assert(column_families.size() == child_iterators.size());
|
||||
|
||||
std::vector<std::pair<ColumnFamilyHandle*, std::unique_ptr<Iterator>>>
|
||||
cfh_iter_pairs;
|
||||
cfh_iter_pairs.reserve(column_families.size());
|
||||
for (size_t i = 0; i < column_families.size(); ++i) {
|
||||
cfh_iter_pairs.emplace_back(
|
||||
column_families[i],
|
||||
write_batch_.NewIteratorWithBase(column_families[i], child_iterators[i],
|
||||
&read_options));
|
||||
}
|
||||
|
||||
return std::make_unique<ImplType>(read_options,
|
||||
column_families[0]->GetComparator(),
|
||||
std::move(cfh_iter_pairs));
|
||||
}
|
||||
|
||||
std::unique_ptr<AttributeGroupIterator>
|
||||
TransactionBaseImpl::GetAttributeGroupIterator(
|
||||
const ReadOptions& read_options,
|
||||
const std::vector<ColumnFamilyHandle*>& column_families) {
|
||||
return NewMultiCfIterator<AttributeGroupIterator, AttributeGroupIteratorImpl>(
|
||||
read_options, column_families,
|
||||
[](const Status& s) { return NewAttributeGroupErrorIterator(s); });
|
||||
}
|
||||
|
||||
Status TransactionBaseImpl::PutEntityImpl(ColumnFamilyHandle* column_family,
|
||||
const Slice& key,
|
||||
const WideColumns& columns,
|
||||
|
|
|
@ -146,6 +146,10 @@ class TransactionBaseImpl : public Transaction {
|
|||
Iterator* GetIterator(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family) override;
|
||||
|
||||
std::unique_ptr<AttributeGroupIterator> GetAttributeGroupIterator(
|
||||
const ReadOptions& read_options,
|
||||
const std::vector<ColumnFamilyHandle*>& column_families) override;
|
||||
|
||||
Status Put(ColumnFamilyHandle* column_family, const Slice& key,
|
||||
const Slice& value, const bool assume_tracked = false) override;
|
||||
Status Put(const Slice& key, const Slice& value) override {
|
||||
|
@ -304,6 +308,13 @@ class TransactionBaseImpl : public Transaction {
|
|||
LockTracker& GetTrackedLocks() { return *tracked_locks_; }
|
||||
|
||||
protected:
|
||||
template <typename IterType, typename ImplType,
|
||||
typename ErrorIteratorFuncType>
|
||||
std::unique_ptr<IterType> NewMultiCfIterator(
|
||||
const ReadOptions& read_options,
|
||||
const std::vector<ColumnFamilyHandle*>& column_families,
|
||||
ErrorIteratorFuncType error_iterator_func);
|
||||
|
||||
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, std::string* value) override;
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "port/port.h"
|
||||
#include "rocksdb/attribute_groups.h"
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/options.h"
|
||||
#include "rocksdb/perf_context.h"
|
||||
|
@ -7464,6 +7465,273 @@ TEST_P(TransactionTest, PutEntityRecovery) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_P(TransactionTest, AttributeGroupIterator) {
|
||||
const TxnDBWritePolicy write_policy = std::get<2>(GetParam());
|
||||
if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) {
|
||||
ROCKSDB_GTEST_BYPASS("Test only WriteCommitted for now");
|
||||
return;
|
||||
}
|
||||
|
||||
ColumnFamilyOptions cf_opts;
|
||||
cf_opts.enable_blob_files = true;
|
||||
|
||||
ColumnFamilyHandle* cfh1 = nullptr;
|
||||
ASSERT_OK(db->CreateColumnFamily(cf_opts, "cf1", &cfh1));
|
||||
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
|
||||
|
||||
ColumnFamilyHandle* cfh2 = nullptr;
|
||||
ASSERT_OK(db->CreateColumnFamily(cf_opts, "cf2", &cfh2));
|
||||
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
|
||||
|
||||
// Note: "cf1" keys are present only in CF1; "cf2" keys are only present in
|
||||
// CF2; "cf12" keys are present in both CFs. "a" keys are present only in the
|
||||
// database; "b" keys are present only in the transaction; "c" keys are
|
||||
// present in both the database and the transaction. The values indicate the
|
||||
// column family as well as whether the entry came from the database or the
|
||||
// transaction.
|
||||
|
||||
ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf1_a", "cf1_a_db_cf1"));
|
||||
ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf1_c", "cf1_c_db_cf1"));
|
||||
|
||||
ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf2_a", "cf2_a_db_cf2"));
|
||||
ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf2_c", "cf2_c_db_cf2"));
|
||||
|
||||
ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf12_a", "cf12_a_db_cf1"));
|
||||
ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf12_a", "cf12_a_db_cf2"));
|
||||
ASSERT_OK(db->Put(WriteOptions(), cfh1, "cf12_c", "cf12_c_db_cf1"));
|
||||
ASSERT_OK(db->Put(WriteOptions(), cfh2, "cf12_c", "cf12_c_db_cf2"));
|
||||
|
||||
ASSERT_OK(db->Flush(FlushOptions(), cfh1));
|
||||
ASSERT_OK(db->Flush(FlushOptions(), cfh2));
|
||||
|
||||
std::unique_ptr<Transaction> txn(db->BeginTransaction(WriteOptions()));
|
||||
|
||||
ASSERT_OK(txn->Put(cfh1, "cf1_b", "cf1_b_txn_cf1"));
|
||||
ASSERT_OK(txn->Put(cfh1, "cf1_c", "cf1_c_txn_cf1"));
|
||||
|
||||
ASSERT_OK(txn->Put(cfh2, "cf2_b", "cf2_b_txn_cf2"));
|
||||
ASSERT_OK(txn->Put(cfh2, "cf2_c", "cf2_c_txn_cf2"));
|
||||
|
||||
ASSERT_OK(txn->Put(cfh1, "cf12_b", "cf12_b_txn_cf1"));
|
||||
ASSERT_OK(txn->Put(cfh2, "cf12_b", "cf12_b_txn_cf2"));
|
||||
ASSERT_OK(txn->Put(cfh1, "cf12_c", "cf12_c_txn_cf1"));
|
||||
ASSERT_OK(txn->Put(cfh2, "cf12_c", "cf12_c_txn_cf2"));
|
||||
|
||||
auto verify = [&](bool allow_unprepared_value, auto prepare_if_needed) {
|
||||
ReadOptions read_options;
|
||||
read_options.allow_unprepared_value = allow_unprepared_value;
|
||||
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(read_options, {cfh1, cfh2}));
|
||||
|
||||
{
|
||||
iter->SeekToFirst();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf12_a");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_a_db_cf1"}};
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_a_db_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns},
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf12_b");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_b_txn_cf1"}};
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_b_txn_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns},
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf12_c");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf12_c_txn_cf1"}};
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf12_c_txn_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns},
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf1_a");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_a_db_cf1"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf1_b");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_b_txn_cf1"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf1_c");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf1_columns{{kDefaultWideColumnName, "cf1_c_txn_cf1"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh1, &cf1_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf2_a");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_a_db_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf2_b");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_b_txn_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_TRUE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
ASSERT_EQ(iter->key(), "cf2_c");
|
||||
|
||||
prepare_if_needed(iter.get());
|
||||
|
||||
WideColumns cf2_columns{{kDefaultWideColumnName, "cf2_c_txn_cf2"}};
|
||||
IteratorAttributeGroups expected{
|
||||
IteratorAttributeGroup{cfh2, &cf2_columns}};
|
||||
ASSERT_EQ(iter->attribute_groups(), expected);
|
||||
}
|
||||
|
||||
{
|
||||
iter->Next();
|
||||
ASSERT_FALSE(iter->Valid());
|
||||
ASSERT_OK(iter->status());
|
||||
}
|
||||
};
|
||||
|
||||
verify(/* allow_unprepared_value */ false, [](AttributeGroupIterator*) {});
|
||||
verify(/* allow_unprepared_value */ true, [](AttributeGroupIterator* iter) {
|
||||
ASSERT_TRUE(iter->attribute_groups().empty());
|
||||
ASSERT_TRUE(iter->PrepareValue());
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(TransactionTest, AttributeGroupIteratorSanityChecks) {
|
||||
ColumnFamilyOptions cf1_opts;
|
||||
ColumnFamilyHandle* cfh1 = nullptr;
|
||||
ASSERT_OK(db->CreateColumnFamily(cf1_opts, "cf1", &cfh1));
|
||||
std::unique_ptr<ColumnFamilyHandle> cfh1_guard(cfh1);
|
||||
|
||||
ColumnFamilyOptions cf2_opts;
|
||||
cf2_opts.comparator = ReverseBytewiseComparator();
|
||||
ColumnFamilyHandle* cfh2 = nullptr;
|
||||
ASSERT_OK(db->CreateColumnFamily(cf2_opts, "cf2", &cfh2));
|
||||
std::unique_ptr<ColumnFamilyHandle> cfh2_guard(cfh2);
|
||||
|
||||
std::unique_ptr<Transaction> txn(db->BeginTransaction(WriteOptions()));
|
||||
|
||||
const TxnDBWritePolicy write_policy = std::get<2>(GetParam());
|
||||
if (write_policy != TxnDBWritePolicy::WRITE_COMMITTED) {
|
||||
{
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(ReadOptions(), {}));
|
||||
ASSERT_TRUE(iter->status().IsNotSupported());
|
||||
}
|
||||
|
||||
{
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(ReadOptions(), {cfh1, cfh2}));
|
||||
ASSERT_TRUE(iter->status().IsNotSupported());
|
||||
}
|
||||
|
||||
{
|
||||
ReadOptions read_options;
|
||||
read_options.io_activity = Env::IOActivity::kCompaction;
|
||||
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(read_options, {cfh1}));
|
||||
ASSERT_TRUE(iter->status().IsNotSupported());
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
{
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(ReadOptions(), {}));
|
||||
ASSERT_TRUE(iter->status().IsInvalidArgument());
|
||||
}
|
||||
|
||||
{
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(ReadOptions(), {cfh1, cfh2}));
|
||||
ASSERT_TRUE(iter->status().IsInvalidArgument());
|
||||
}
|
||||
|
||||
{
|
||||
ReadOptions read_options;
|
||||
read_options.io_activity = Env::IOActivity::kCompaction;
|
||||
|
||||
std::unique_ptr<AttributeGroupIterator> iter(
|
||||
txn->GetAttributeGroupIterator(read_options, {cfh1}));
|
||||
ASSERT_TRUE(iter->status().IsInvalidArgument());
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TransactionDBTest, CollapseKey) {
|
||||
ASSERT_OK(ReOpen());
|
||||
ASSERT_OK(db->Put({}, "hello", "world"));
|
||||
|
|
|
@ -3,13 +3,13 @@
|
|||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
|
||||
#include "utilities/transactions/write_prepared_txn.h"
|
||||
|
||||
#include <cinttypes>
|
||||
#include <map>
|
||||
#include <set>
|
||||
|
||||
#include "db/attribute_group_iterator_impl.h"
|
||||
#include "db/column_family.h"
|
||||
#include "db/db_impl/db_impl.h"
|
||||
#include "rocksdb/db.h"
|
||||
|
@ -135,6 +135,15 @@ Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
|
|||
return write_batch_.NewIteratorWithBase(column_family, db_iter, &options);
|
||||
}
|
||||
|
||||
std::unique_ptr<AttributeGroupIterator>
|
||||
WritePreparedTxn::GetAttributeGroupIterator(
|
||||
const ReadOptions& /* read_options */,
|
||||
const std::vector<ColumnFamilyHandle*>& /* column_families */) {
|
||||
return NewAttributeGroupErrorIterator(
|
||||
Status::NotSupported("GetAttributeGroupIterator not supported for "
|
||||
"write-prepared/write-unprepared transactions"));
|
||||
}
|
||||
|
||||
Status WritePreparedTxn::PrepareInternal() {
|
||||
WriteOptions write_options = write_options_;
|
||||
write_options.disableWAL = false;
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
|
@ -71,6 +70,10 @@ class WritePreparedTxn : public PessimisticTransaction {
|
|||
Iterator* GetIterator(const ReadOptions& options,
|
||||
ColumnFamilyHandle* column_family) override;
|
||||
|
||||
std::unique_ptr<AttributeGroupIterator> GetAttributeGroupIterator(
|
||||
const ReadOptions& read_options,
|
||||
const std::vector<ColumnFamilyHandle*>& column_families) override;
|
||||
|
||||
void SetSnapshot() override;
|
||||
|
||||
protected:
|
||||
|
|
Loading…
Reference in a new issue