rocksdb/utilities/transactions/write_committed_transaction_ts_test.cc
Yu Zhang 719c96125c Add a TransactionOptions to enable tracking timestamp size info inside WriteBatch (#12864)
Summary:
In normal use cases, meta info like column family's timestamp size is tracked at the transaction layer, so it's not necessary and even detrimental to track such info inside the internal WriteBatch because it may let anti-patterns like bypassing Transaction write APIs and directly write to its internal WriteBatch like this:
9d0a754dc9/storage/rocksdb/ha_rocksdb.cc (L4949-L4950)
Setting this option to true will keep aforementioned use case continue to work before it's refactored out. This option is only for this purpose and it will be gradually deprecated after aforementioned MyRocks use case are refactored.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12864

Test Plan: Added unit tests

Reviewed By: cbi42

Differential Revision: D60194094

Pulled By: jowlyzhang

fbshipit-source-id: 64a98822167e99aa7e4fa2a60085d44a5deaa45c
2024-08-05 13:06:45 -07:00

1109 lines
39 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/testutil.h"
#include "utilities/merge_operators.h"
#include "utilities/transactions/transaction_test.h"
namespace ROCKSDB_NAMESPACE {
INSTANTIATE_TEST_CASE_P(
DBAsBaseDB, WriteCommittedTxnWithTsTest,
::testing::Values(std::make_tuple(false, /*two_write_queue=*/false,
/*enable_indexing=*/false),
std::make_tuple(false, /*two_write_queue=*/true,
/*enable_indexing=*/false),
std::make_tuple(false, /*two_write_queue=*/false,
/*enable_indexing=*/true),
std::make_tuple(false, /*two_write_queue=*/true,
/*enable_indexing=*/true)));
INSTANTIATE_TEST_CASE_P(
DBAsStackableDB, WriteCommittedTxnWithTsTest,
::testing::Values(std::make_tuple(true, /*two_write_queue=*/false,
/*enable_indexing=*/false),
std::make_tuple(true, /*two_write_queue=*/true,
/*enable_indexing=*/false),
std::make_tuple(true, /*two_write_queue=*/false,
/*enable_indexing=*/true),
std::make_tuple(true, /*two_write_queue=*/true,
/*enable_indexing=*/true)));
TEST_P(WriteCommittedTxnWithTsTest, SanityChecks) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_opts;
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn);
ASSERT_OK(txn->Put(handles_[1], "foo", "value"));
ASSERT_TRUE(txn->Commit().IsInvalidArgument());
auto* pessimistic_txn =
static_cast_with_check<PessimisticTransaction>(txn.get());
ASSERT_TRUE(
pessimistic_txn->CommitBatch(/*batch=*/nullptr).IsInvalidArgument());
{
WriteBatchWithIndex* wbwi = txn->GetWriteBatch();
assert(wbwi);
WriteBatch* wb = wbwi->GetWriteBatch();
assert(wb);
// Write a key to the batch for nonexisting cf.
ASSERT_OK(WriteBatchInternal::Put(wb, /*column_family_id=*/10, /*key=*/"",
/*value=*/""));
}
ASSERT_OK(txn->SetCommitTimestamp(20));
ASSERT_TRUE(txn->Commit().IsInvalidArgument());
txn.reset();
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn1);
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Put(handles_[1], "foo", "value"));
{
WriteBatchWithIndex* wbwi = txn1->GetWriteBatch();
assert(wbwi);
WriteBatch* wb = wbwi->GetWriteBatch();
assert(wb);
// Write a key to the batch for non-existing cf.
ASSERT_OK(WriteBatchInternal::Put(wb, /*column_family_id=*/10, /*key=*/"",
/*value=*/""));
}
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->SetCommitTimestamp(21));
ASSERT_TRUE(txn1->Commit().IsInvalidArgument());
txn1.reset();
}
void CheckKeyValueTsWithIterator(
Iterator* iter,
std::vector<std::tuple<std::string, std::string, std::string>> entries) {
size_t num_entries = entries.size();
// test forward iteration
for (size_t i = 0; i < num_entries; i++) {
auto [key, value, timestamp] = entries[i];
if (i == 0) {
iter->Seek(key);
} else {
iter->Next();
}
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), key);
ASSERT_EQ(iter->value(), value);
ASSERT_EQ(iter->timestamp(), timestamp);
}
// test backward iteration
for (size_t i = 0; i < num_entries; i++) {
auto [key, value, timestamp] = entries[num_entries - 1 - i];
if (i == 0) {
iter->SeekForPrev(key);
} else {
iter->Prev();
}
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key(), key);
ASSERT_EQ(iter->value(), value);
ASSERT_EQ(iter->timestamp(), timestamp);
}
}
// This is an incorrect usage of this API, supporting this should be removed
// after MyRocks remove this pattern in a refactor.
TEST_P(WriteCommittedTxnWithTsTest, WritesBypassTransactionAPIs) {
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
ASSERT_OK(ReOpen());
const std::string test_cf_name = "test_cf";
ColumnFamilyOptions cf_options;
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
// Write in each transaction a mixture of column families that enable
// timestamp and disable timestamps.
TransactionOptions txn_opts;
txn_opts.write_batch_track_timestamp_size = true;
std::unique_ptr<Transaction> txn0(NewTxn(WriteOptions(), txn_opts));
assert(txn0);
ASSERT_OK(txn0->Put(handles_[0], "key1", "key1_val"));
// Timestamp size info for writes like this can only be correctly tracked if
// TransactionOptions.write_batch_track_timestamp_size is true.
ASSERT_OK(txn0->GetWriteBatch()->GetWriteBatch()->Put(handles_[1], "foo",
"foo_val"));
ASSERT_OK(txn0->SetName("txn0"));
ASSERT_OK(txn0->SetCommitTimestamp(2));
ASSERT_OK(txn0->Prepare());
ASSERT_OK(txn0->Commit());
txn0.reset();
// For keys written from transactions that disable
// `write_batch_track_timestamp_size`
// The keys has incorrect behavior like:
// *Cannot be found after commit: because transaction's UpdateTimestamp do not
// have correct timestamp size when this write bypass transaction write APIs.
// *Can be found again after DB restart recovers the write from WAL log:
// because recovered transaction's UpdateTimestamp get correct timestamp size
// info directly from VersionSet.
// If there is a flush that persisted this transaction into sst files after
// it's committed, the key will be forever corrupted.
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn1);
ASSERT_OK(txn1->Put(handles_[0], "key2", "key2_val"));
// Writing a key with more than 8 bytes so that we can manifest the error as
// a NotFound error instead of an issue during `WriteBatch::UpdateTimestamp`.
ASSERT_OK(txn1->GetWriteBatch()->GetWriteBatch()->Put(
handles_[1], "foobarbaz", "baz_val"));
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->SetCommitTimestamp(2));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->Commit());
txn1.reset();
ASSERT_OK(db->Flush(FlushOptions(), handles_[1]));
std::unique_ptr<Transaction> txn2(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn2);
ASSERT_OK(txn2->Put(handles_[0], "key3", "key3_val"));
ASSERT_OK(txn2->GetWriteBatch()->GetWriteBatch()->Put(
handles_[1], "bazbazbaz", "bazbazbaz_val"));
ASSERT_OK(txn2->SetCommitTimestamp(2));
ASSERT_OK(txn2->SetName("txn2"));
ASSERT_OK(txn2->Prepare());
ASSERT_OK(txn2->Commit());
txn2.reset();
std::unique_ptr<Transaction> txn3(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn3);
std::string value;
ReadOptions ropts;
std::string read_ts;
Slice timestamp = EncodeU64Ts(2, &read_ts);
ropts.timestamp = &timestamp;
ASSERT_OK(txn3->Get(ropts, handles_[0], "key1", &value));
ASSERT_EQ("key1_val", value);
ASSERT_OK(txn3->Get(ropts, handles_[0], "key2", &value));
ASSERT_EQ("key2_val", value);
ASSERT_OK(txn3->Get(ropts, handles_[0], "key3", &value));
ASSERT_EQ("key3_val", value);
txn3.reset();
std::unique_ptr<Transaction> txn4(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn4);
ASSERT_OK(txn4->Get(ReadOptions(), handles_[1], "foo", &value));
ASSERT_EQ("foo_val", value);
// Incorrect behavior: committed keys cannot be found
ASSERT_TRUE(
txn4->Get(ReadOptions(), handles_[1], "foobarbaz", &value).IsNotFound());
ASSERT_TRUE(
txn4->Get(ReadOptions(), handles_[1], "bazbazbaz", &value).IsNotFound());
txn4.reset();
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn5(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn5);
ASSERT_OK(txn5->Get(ReadOptions(), handles_[1], "foo", &value));
ASSERT_EQ("foo_val", value);
// Incorrect behavior:
// *unflushed key can be found after reopen replays the entries from WAL
// (this is not suggesting using flushing as a workaround but to show a
// possible misleading behavior)
// *flushed key is forever corrupted.
ASSERT_TRUE(
txn5->Get(ReadOptions(), handles_[1], "foobarbaz", &value).IsNotFound());
ASSERT_OK(txn5->Get(ReadOptions(), handles_[1], "bazbazbaz", &value));
ASSERT_EQ("bazbazbaz_val", value);
txn5.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, ReOpenWithTimestamp) {
options.merge_operator = MergeOperators::CreateUInt64AddOperator();
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_opts;
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn0);
ASSERT_OK(txn0->Put(handles_[1], "foo", "value"));
ASSERT_OK(txn0->SetName("txn0"));
ASSERT_OK(txn0->Prepare());
ASSERT_TRUE(txn0->Commit().IsInvalidArgument());
txn0.reset();
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn1);
std::string write_ts;
uint64_t write_ts_int = 23;
PutFixed64(&write_ts, write_ts_int);
ReadOptions read_opts;
std::string read_ts;
PutFixed64(&read_ts, write_ts_int + 1);
Slice read_ts_slice = read_ts;
read_opts.timestamp = &read_ts_slice;
ASSERT_OK(txn1->Put(handles_[1], "bar", "value0"));
ASSERT_OK(txn1->Put(handles_[1], "foo", "value1"));
// (key, value, ts) pairs to check.
std::vector<std::tuple<std::string, std::string, std::string>>
entries_to_check;
entries_to_check.emplace_back("bar", "value0", "");
entries_to_check.emplace_back("foo", "value1", "");
{
std::string buf;
PutFixed64(&buf, 23);
ASSERT_OK(txn1->Put("id", buf));
ASSERT_OK(txn1->Merge("id", buf));
}
// Check (key, value, ts) with overwrites in txn before `SetCommitTimestamp`.
if (std::get<2>(GetParam())) { // enable_indexing = true
std::unique_ptr<Iterator> iter(txn1->GetIterator(read_opts, handles_[1]));
CheckKeyValueTsWithIterator(iter.get(), entries_to_check);
}
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->SetCommitTimestamp(write_ts_int));
// Check (key, value, ts) with overwrites in txn after `SetCommitTimestamp`.
if (std::get<2>(GetParam())) { // enable_indexing = true
std::unique_ptr<Iterator> iter(txn1->GetIterator(read_opts, handles_[1]));
CheckKeyValueTsWithIterator(iter.get(), entries_to_check);
}
ASSERT_OK(txn1->Commit());
entries_to_check.clear();
entries_to_check.emplace_back("bar", "value0", write_ts);
entries_to_check.emplace_back("foo", "value1", write_ts);
// Check (key, value, ts) pairs with overwrites in txn after `Commit`.
{
std::unique_ptr<Iterator> iter(txn1->GetIterator(read_opts, handles_[1]));
CheckKeyValueTsWithIterator(iter.get(), entries_to_check);
}
txn1.reset();
{
std::string value;
const Status s =
GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/23, &value);
ASSERT_OK(s);
ASSERT_EQ("value1", value);
}
{
std::string value;
const Status s = db->Get(ReadOptions(), handles_[0], "id", &value);
ASSERT_OK(s);
uint64_t ival = 0;
Slice value_slc = value;
bool result = GetFixed64(&value_slc, &ival);
assert(result);
ASSERT_EQ(46, ival);
}
// Check (key, value, ts) pairs without overwrites in txn.
{
std::unique_ptr<Transaction> txn2(
NewTxn(WriteOptions(), TransactionOptions()));
std::unique_ptr<Iterator> iter(txn2->GetIterator(read_opts, handles_[1]));
CheckKeyValueTsWithIterator(iter.get(), entries_to_check);
}
}
TEST_P(WriteCommittedTxnWithTsTest, RecoverFromWal) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_opts;
cf_opts.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_opts, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_opts);
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn0);
ASSERT_OK(txn0->Put(handles_[1], "foo", "foo_value"));
ASSERT_OK(txn0->SetName("txn0"));
ASSERT_OK(txn0->Prepare());
WriteOptions write_opts;
write_opts.sync = true;
std::unique_ptr<Transaction> txn1(NewTxn(write_opts, TransactionOptions()));
assert(txn1);
ASSERT_OK(txn1->Put("bar", "bar_value_1"));
ASSERT_OK(txn1->Put(handles_[1], "bar", "bar_value_1"));
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->SetCommitTimestamp(/*ts=*/23));
ASSERT_OK(txn1->Commit());
txn1.reset();
std::unique_ptr<Transaction> txn2(NewTxn(write_opts, TransactionOptions()));
assert(txn2);
ASSERT_OK(txn2->Put("key1", "value_3"));
ASSERT_OK(txn2->Put(handles_[1], "key1", "value_3"));
ASSERT_OK(txn2->SetCommitTimestamp(/*ts=*/24));
ASSERT_OK(txn2->Commit());
txn2.reset();
txn0.reset();
std::unique_ptr<Transaction> txn3(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn3);
ASSERT_OK(txn3->Put(handles_[1], "baz", "baz_value"));
ASSERT_OK(txn3->SetName("txn3"));
ASSERT_OK(txn3->Prepare());
txn3.reset();
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
{
Transaction* recovered_txn0 = db->GetTransactionByName("txn0");
ASSERT_OK(recovered_txn0->SetCommitTimestamp(23));
ASSERT_OK(recovered_txn0->Commit());
delete recovered_txn0;
std::string value;
Status s = GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/23, &value);
ASSERT_OK(s);
ASSERT_EQ("foo_value", value);
s = db->Get(ReadOptions(), handles_[0], "bar", &value);
ASSERT_OK(s);
ASSERT_EQ("bar_value_1", value);
value.clear();
s = GetFromDb(ReadOptions(), handles_[1], "bar", /*ts=*/23, &value);
ASSERT_OK(s);
ASSERT_EQ("bar_value_1", value);
s = GetFromDb(ReadOptions(), handles_[1], "key1", /*ts=*/23, &value);
ASSERT_TRUE(s.IsNotFound());
s = db->Get(ReadOptions(), handles_[0], "key1", &value);
ASSERT_OK(s);
ASSERT_EQ("value_3", value);
s = GetFromDb(ReadOptions(), handles_[1], "key1", /*ts=*/24, &value);
ASSERT_OK(s);
ASSERT_EQ("value_3", value);
s = GetFromDb(ReadOptions(), handles_[1], "baz", /*ts=*/24, &value);
ASSERT_TRUE(s.IsNotFound());
}
}
TEST_P(WriteCommittedTxnWithTsTest, TransactionDbLevelApi) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, cf_options);
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::string key_str = "tes_key";
std::string ts_str;
std::string value_str = "test_value";
PutFixed64(&ts_str, 100);
Slice value = value_str;
assert(db);
ASSERT_TRUE(
db->Put(WriteOptions(), handles_[1], "foo", "bar").IsNotSupported());
ASSERT_TRUE(db->Delete(WriteOptions(), handles_[1], "foo").IsNotSupported());
ASSERT_TRUE(
db->SingleDelete(WriteOptions(), handles_[1], "foo").IsNotSupported());
ASSERT_TRUE(
db->Merge(WriteOptions(), handles_[1], "foo", "+1").IsNotSupported());
WriteBatch wb1(/*reserved_bytes=*/0, /*max_bytes=*/0,
/*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0);
ASSERT_OK(wb1.Put(handles_[1], key_str, ts_str, value));
ASSERT_TRUE(db->Write(WriteOptions(), &wb1).IsNotSupported());
ASSERT_TRUE(db->Write(WriteOptions(), TransactionDBWriteOptimizations(), &wb1)
.IsNotSupported());
auto* pessimistic_txn_db =
static_cast_with_check<PessimisticTransactionDB>(db);
assert(pessimistic_txn_db);
ASSERT_TRUE(
pessimistic_txn_db->WriteWithConcurrencyControl(WriteOptions(), &wb1)
.IsNotSupported());
ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
ASSERT_OK(db->Put(WriteOptions(), "bar", "value"));
ASSERT_OK(db->Delete(WriteOptions(), "bar"));
ASSERT_OK(db->SingleDelete(WriteOptions(), "foo"));
ASSERT_OK(db->Put(WriteOptions(), "key", "value"));
ASSERT_OK(db->Merge(WriteOptions(), "key", "_more"));
WriteBatch wb2(/*reserved_bytes=*/0, /*max_bytes=*/0,
/*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0);
ASSERT_OK(wb2.Put(key_str, value));
ASSERT_OK(db->Write(WriteOptions(), &wb2));
ASSERT_OK(db->Write(WriteOptions(), TransactionDBWriteOptimizations(), &wb2));
ASSERT_OK(
pessimistic_txn_db->WriteWithConcurrencyControl(WriteOptions(), &wb2));
std::unique_ptr<Transaction> txn(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn);
WriteBatch wb3(/*reserved_bytes=*/0, /*max_bytes=*/0,
/*protection_bytes_per_key=*/0, /*default_cf_ts_sz=*/0);
ASSERT_OK(wb3.Put(handles_[1], "key", "value"));
auto* pessimistic_txn =
static_cast_with_check<PessimisticTransaction>(txn.get());
assert(pessimistic_txn);
ASSERT_TRUE(pessimistic_txn->CommitBatch(&wb3).IsNotSupported());
txn.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, Merge) {
options.merge_operator = MergeOperators::CreateStringAppendOperator();
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
cf_options.merge_operator = MergeOperators::CreateStringAppendOperator();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn);
ASSERT_OK(txn->Put(handles_[1], "foo", "bar"));
ASSERT_OK(txn->Merge(handles_[1], "foo", "1"));
ASSERT_OK(txn->SetCommitTimestamp(24));
ASSERT_OK(txn->Commit());
txn.reset();
{
std::string value;
const Status s =
GetFromDb(ReadOptions(), handles_[1], "foo", /*ts=*/24, &value);
ASSERT_OK(s);
ASSERT_EQ("bar,1", value);
}
}
TEST_P(WriteCommittedTxnWithTsTest, GetForUpdate) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
// Not set read timestamp, use blind write
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn1->Put(handles_[1], "key", "value1"));
ASSERT_OK(txn1->Put(handles_[1], "foo", "value1"));
ASSERT_OK(txn1->SetCommitTimestamp(24));
ASSERT_OK(txn1->Commit());
txn1.reset();
// Set read timestamp, use it for validation in GetForUpdate, validation fail
// with conflict: timestamp from db(24) > read timestamp(23)
std::string value;
ASSERT_OK(txn0->SetReadTimestampForValidation(23));
ASSERT_TRUE(
txn0->GetForUpdate(ReadOptions(), handles_[1], "key", &value).IsBusy());
ASSERT_OK(txn0->Rollback());
txn0.reset();
// Set read timestamp, use it for validation in GetForUpdate, validation pass
// with no conflict: timestamp from db(24) < read timestamp (25)
std::unique_ptr<Transaction> txn2(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn2->SetReadTimestampForValidation(25));
ASSERT_OK(txn2->GetForUpdate(ReadOptions(), handles_[1], "key", &value));
// Use a different read timestamp in ReadOptions while doing validation is
// invalid.
ReadOptions read_options;
std::string read_timestamp;
Slice diff_read_ts = EncodeU64Ts(24, &read_timestamp);
read_options.timestamp = &diff_read_ts;
ASSERT_TRUE(txn2->GetForUpdate(read_options, handles_[1], "foo", &value)
.IsInvalidArgument());
ASSERT_OK(txn2->SetCommitTimestamp(26));
ASSERT_OK(txn2->Commit());
txn2.reset();
// Set read timestamp, call GetForUpdate without validation, invalid
std::unique_ptr<Transaction> txn3(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn3->SetReadTimestampForValidation(27));
ASSERT_TRUE(txn3->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
/*exclusive=*/true, /*do_validate=*/false)
.IsInvalidArgument());
ASSERT_OK(txn3->Rollback());
txn3.reset();
// Not set read timestamp, call GetForUpdate with validation, invalid
std::unique_ptr<Transaction> txn4(
NewTxn(WriteOptions(), TransactionOptions()));
// ReadOptions.timestamp is not set, invalid
ASSERT_TRUE(txn4->GetForUpdate(ReadOptions(), handles_[1], "key", &value)
.IsInvalidArgument());
// ReadOptions.timestamp is set, also invalid.
// `SetReadTimestampForValidation` must have been called with the same
// timestamp as in ReadOptions.timestamp for validation.
Slice read_ts = EncodeU64Ts(27, &read_timestamp);
read_options.timestamp = &read_ts;
ASSERT_TRUE(txn4->GetForUpdate(read_options, handles_[1], "key", &value)
.IsInvalidArgument());
ASSERT_OK(txn4->Rollback());
txn4.reset();
// Not set read timestamp, call GetForUpdate without validation, pass
std::unique_ptr<Transaction> txn5(
NewTxn(WriteOptions(), TransactionOptions()));
// ReadOptions.timestamp is not set, pass
ASSERT_OK(txn5->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
/*exclusive=*/true, /*do_validate=*/false));
// ReadOptions.timestamp explicitly set to max timestamp, pass
Slice max_ts = MaxU64Ts();
read_options.timestamp = &max_ts;
ASSERT_OK(txn5->GetForUpdate(read_options, handles_[1], "foo", &value,
/*exclusive=*/true, /*do_validate=*/false));
// NOTE: this commit timestamp is smaller than the db's timestamp (26), but
// this commit can still go through, that breaks the user-defined timestamp
// invariant: newer user-defined timestamp should have newer sequence number.
// So be aware of skipping UDT based validation. Unless users have their own
// ways to ensure the UDT invariant is met, DO NOT skip it. Ways to ensure
// the UDT invariant include: manage a monotonically increasing timestamp,
// commit transactions in a single thread etc.
ASSERT_OK(txn5->SetCommitTimestamp(3));
ASSERT_OK(txn5->Commit());
txn5.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, GetForUpdateUdtValidationNotEnabled) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
txn_db_options.enable_udt_validation = false;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
// blind write a key/value for latter read via `GetForUpdate`.
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn0->Put(handles_[1], "key", "value0"));
ASSERT_OK(txn0->SetCommitTimestamp(20));
ASSERT_OK(txn0->Commit());
// When timestamp validation is disabled across the whole DB
// `SetReadTimestampForValidation` should not be called.
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
std::string value;
ASSERT_OK(txn1->SetReadTimestampForValidation(21));
ASSERT_TRUE(txn1->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
/* exclusive= */ true, /*do_validate=*/true)
.IsInvalidArgument());
txn1.reset();
// do_validate and no snapshot, no conflict checking at all
std::unique_ptr<Transaction> txn2(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn2->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
/* exclusive= */ true, /*do_validate=*/true));
ASSERT_OK(txn2->Put(handles_[1], "key", "value1"));
ASSERT_OK(txn2->SetCommitTimestamp(21));
ASSERT_OK(txn2->Commit());
txn2.reset();
// do_validate and set snapshot, execute sequence number based conflict
// checking and skip timestamp based conflict checking.
std::unique_ptr<Transaction> txn3(
NewTxn(WriteOptions(), TransactionOptions()));
txn3->SetSnapshot();
ASSERT_OK(txn3->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
/* exclusive= */ true, /*do_validate=*/true));
ASSERT_OK(txn3->Put(handles_[1], "key", "value2"));
ASSERT_OK(txn3->SetCommitTimestamp(22));
ASSERT_OK(txn3->Commit());
txn3.reset();
// Always check `ReadOptions.timestamp` to be consistent with the default
// `read_timestamp_` if it's explicitly set, even if whole DB disables
// timestamp validation.
std::unique_ptr<Transaction> txn4(
NewTxn(WriteOptions(), TransactionOptions()));
ReadOptions ropts;
std::string read_timestamp;
Slice read_ts = EncodeU64Ts(27, &read_timestamp);
ropts.timestamp = &read_ts;
ASSERT_TRUE(txn4->GetForUpdate(ropts, handles_[1], "key", &value,
/* exclusive= */ true, /*do_validate=*/true)
.IsInvalidArgument());
txn4.reset();
// Conflict of timestamps not caught when parallel transactions commit with
// some out of order timestamps.
std::unique_ptr<Transaction> txn5(
db->BeginTransaction(WriteOptions(), TransactionOptions()));
assert(txn5);
std::unique_ptr<Transaction> txn6(
db->BeginTransaction(WriteOptions(), TransactionOptions()));
assert(txn6);
ASSERT_OK(txn6->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
/* exclusive= */ true, /*do_validate=*/true));
ASSERT_OK(txn6->Put(handles_[1], "key", "value4"));
ASSERT_OK(txn6->SetName("txn6"));
ASSERT_OK(txn6->Prepare());
ASSERT_OK(txn6->SetCommitTimestamp(24));
ASSERT_OK(txn6->Commit());
txn6.reset();
txn5->SetSnapshot();
ASSERT_OK(txn5->GetForUpdate(ReadOptions(), handles_[1], "key", &value,
/* exclusive= */ true, /*do_validate=*/true));
ASSERT_OK(txn5->Put(handles_[1], "key", "value3"));
ASSERT_OK(txn5->SetName("txn5"));
// txn5 commits after txn6 but writes a smaller timestamp
ASSERT_OK(txn5->SetCommitTimestamp(23));
ASSERT_OK(txn5->Commit());
txn5.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, BlindWrite) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn0);
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn1);
{
std::string value;
ASSERT_OK(txn0->SetReadTimestampForValidation(100));
// Lock "key".
ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], "key", &value)
.IsNotFound());
}
ASSERT_OK(txn0->Put(handles_[1], "key", "value0"));
ASSERT_OK(txn0->SetCommitTimestamp(101));
ASSERT_OK(txn0->Commit());
ASSERT_OK(txn1->Put(handles_[1], "key", "value1"));
// In reality, caller needs to ensure commit_ts of txn1 is greater than the
// commit_ts of txn0, which is true for lock-based concurrency control.
ASSERT_OK(txn1->SetCommitTimestamp(102));
ASSERT_OK(txn1->Commit());
txn0.reset();
txn1.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, RefineReadTimestamp) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
assert(db);
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
delete cfh;
cfh = nullptr;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
cf_descs.emplace_back(test_cf_name, Options(DBOptions(), cf_options));
options.avoid_flush_during_shutdown = true;
ASSERT_OK(ReOpenNoDelete(cf_descs, &handles_));
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn0);
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
assert(txn1);
{
ASSERT_OK(txn0->SetReadTimestampForValidation(100));
// Lock "key0", "key1", ..., "key4".
for (int i = 0; i < 5; ++i) {
std::string value;
ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1],
"key" + std::to_string(i), &value)
.IsNotFound());
}
}
ASSERT_OK(txn1->Put(handles_[1], "key5", "value5_0"));
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->SetCommitTimestamp(101));
ASSERT_OK(txn1->Commit());
txn1.reset();
{
std::string value;
ASSERT_TRUE(txn0->GetForUpdate(ReadOptions(), handles_[1], "key5", &value)
.IsBusy());
ASSERT_OK(txn0->SetReadTimestampForValidation(102));
ASSERT_OK(txn0->GetForUpdate(ReadOptions(), handles_[1], "key5", &value));
ASSERT_EQ("value5_0", value);
}
for (int i = 0; i < 6; ++i) {
ASSERT_OK(txn0->Put(handles_[1], "key" + std::to_string(i),
"value" + std::to_string(i)));
}
ASSERT_OK(txn0->SetName("txn0"));
ASSERT_OK(txn0->Prepare());
ASSERT_OK(txn0->SetCommitTimestamp(103));
ASSERT_OK(txn0->Commit());
txn0.reset();
}
TEST_P(WriteCommittedTxnWithTsTest, CheckKeysForConflicts) {
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
ASSERT_OK(ReOpen());
std::unique_ptr<Transaction> txn1(
db->BeginTransaction(WriteOptions(), TransactionOptions()));
assert(txn1);
std::unique_ptr<Transaction> txn2(
db->BeginTransaction(WriteOptions(), TransactionOptions()));
assert(txn2);
ASSERT_OK(txn2->Put("foo", "v0"));
ASSERT_OK(txn2->SetCommitTimestamp(10));
ASSERT_OK(txn2->Commit());
txn2.reset();
// txn1 takes a snapshot after txn2 commits. The writes of txn2 have
// a smaller seqno than txn1's snapshot, thus should not affect conflict
// checking.
txn1->SetSnapshot();
std::unique_ptr<Transaction> txn3(
db->BeginTransaction(WriteOptions(), TransactionOptions()));
assert(txn3);
ASSERT_OK(txn3->SetReadTimestampForValidation(20));
std::string dontcare;
ASSERT_OK(txn3->GetForUpdate(ReadOptions(), "foo", &dontcare));
ASSERT_OK(txn3->SingleDelete("foo"));
ASSERT_OK(txn3->SetName("txn3"));
ASSERT_OK(txn3->Prepare());
ASSERT_OK(txn3->SetCommitTimestamp(30));
// txn3 reads at ts=20 > txn2's commit timestamp, and commits at ts=30.
// txn3 can commit successfully, leaving a tombstone with ts=30.
ASSERT_OK(txn3->Commit());
txn3.reset();
bool called = false;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::GetLatestSequenceForKey:mem", [&](void* arg) {
auto* const ts_ptr = static_cast<std::string*>(arg);
assert(ts_ptr);
Slice ts_slc = *ts_ptr;
uint64_t last_ts = 0;
ASSERT_TRUE(GetFixed64(&ts_slc, &last_ts));
ASSERT_EQ(30, last_ts);
called = true;
});
SyncPoint::GetInstance()->EnableProcessing();
// txn1's read timestamp is 25 < 30 (commit timestamp of txn3). Therefore,
// the tombstone written by txn3 causes the conflict checking to fail.
ASSERT_OK(txn1->SetReadTimestampForValidation(25));
ASSERT_TRUE(txn1->GetForUpdate(ReadOptions(), "foo", &dontcare).IsBusy());
ASSERT_TRUE(called);
Transaction* reused_txn =
db->BeginTransaction(WriteOptions(), TransactionOptions(), txn1.get());
ASSERT_EQ(reused_txn, txn1.get());
ASSERT_OK(reused_txn->Put("foo", "v1"));
ASSERT_OK(reused_txn->SetCommitTimestamp(40));
ASSERT_OK(reused_txn->Commit());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(WriteCommittedTxnWithTsTest, GetEntityForUpdate) {
ASSERT_OK(ReOpenNoDelete());
ColumnFamilyOptions cf_options;
cf_options.comparator = test::BytewiseComparatorWithU64TsWrapper();
const std::string test_cf_name = "test_cf";
ColumnFamilyHandle* cfh = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_options, test_cf_name, &cfh));
std::unique_ptr<ColumnFamilyHandle> cfh_guard(cfh);
constexpr char foo[] = "foo";
constexpr char bar[] = "bar";
constexpr char baz[] = "baz";
constexpr char quux[] = "quux";
{
std::unique_ptr<Transaction> txn0(
NewTxn(WriteOptions(), TransactionOptions()));
{
std::unique_ptr<Transaction> txn1(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn1->Put(cfh, foo, bar));
ASSERT_OK(txn1->Put(cfh, baz, quux));
ASSERT_OK(txn1->SetCommitTimestamp(24));
ASSERT_OK(txn1->Commit());
}
ASSERT_OK(txn0->SetReadTimestampForValidation(23));
// Validation fails: timestamp from db(24) > validation timestamp(23)
PinnableWideColumns columns;
ASSERT_TRUE(
txn0->GetEntityForUpdate(ReadOptions(), cfh, foo, &columns).IsBusy());
ASSERT_OK(txn0->Rollback());
}
{
std::unique_ptr<Transaction> txn2(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn2->SetReadTimestampForValidation(25));
// Validation successful: timestamp from db(24) < validation timestamp (25)
{
PinnableWideColumns columns;
ASSERT_OK(txn2->GetEntityForUpdate(ReadOptions(), cfh, foo, &columns));
}
// Using a different read timestamp in ReadOptions while doing validation is
// not allowed
{
ReadOptions read_options;
std::string read_timestamp;
Slice diff_read_ts = EncodeU64Ts(24, &read_timestamp);
read_options.timestamp = &diff_read_ts;
PinnableWideColumns columns;
ASSERT_TRUE(txn2->GetEntityForUpdate(read_options, cfh, foo, &columns)
.IsInvalidArgument());
ASSERT_OK(txn2->SetCommitTimestamp(26));
ASSERT_OK(txn2->Commit());
}
}
// GetEntityForUpdate with validation timestamp set but no validation is not
// allowed
{
std::unique_ptr<Transaction> txn3(
NewTxn(WriteOptions(), TransactionOptions()));
ASSERT_OK(txn3->SetReadTimestampForValidation(27));
PinnableWideColumns columns;
ASSERT_TRUE(txn3->GetEntityForUpdate(ReadOptions(), cfh, foo, &columns,
/*exclusive=*/true,
/*do_validate=*/false)
.IsInvalidArgument());
ASSERT_OK(txn3->Rollback());
}
// GetEntityForUpdate with validation but no validation timestamp is not
// allowed
{
std::unique_ptr<Transaction> txn4(
NewTxn(WriteOptions(), TransactionOptions()));
// ReadOptions.timestamp is not set
{
PinnableWideColumns columns;
ASSERT_TRUE(txn4->GetEntityForUpdate(ReadOptions(), cfh, foo, &columns)
.IsInvalidArgument());
}
// ReadOptions.timestamp is set
{
ReadOptions read_options;
std::string read_timestamp;
Slice read_ts = EncodeU64Ts(27, &read_timestamp);
read_options.timestamp = &read_ts;
PinnableWideColumns columns;
ASSERT_TRUE(txn4->GetEntityForUpdate(read_options, cfh, foo, &columns)
.IsInvalidArgument());
}
ASSERT_OK(txn4->Rollback());
}
// Validation disabled
{
std::unique_ptr<Transaction> txn5(
NewTxn(WriteOptions(), TransactionOptions()));
// ReadOptions.timestamp is not set => success
{
PinnableWideColumns columns;
ASSERT_OK(txn5->GetEntityForUpdate(ReadOptions(), cfh, foo, &columns,
/*exclusive=*/true,
/*do_validate=*/false));
}
// ReadOptions.timestamp explicitly set to max timestamp => success
{
ReadOptions read_options;
Slice max_ts = MaxU64Ts();
read_options.timestamp = &max_ts;
PinnableWideColumns columns;
ASSERT_OK(txn5->GetEntityForUpdate(read_options, cfh, baz, &columns,
/*exclusive=*/true,
/*do_validate=*/false));
}
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}