rocksdb/utilities/transactions/write_prepared_transaction_test.cc
sdong afa3518839 Revert "Update googletest from 1.8.1 to 1.10.0 (#6808)" (#6923)
Summary:
This reverts commit 8d87e9cea1.

Based on offline discussions, it's too early to upgrade to gtest 1.10, as it prevents some developers from using an older version of gtest to integrate to some other systems. Revert it for now.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6923

Reviewed By: pdillinger

Differential Revision: D21864799

fbshipit-source-id: d0726b1ff649fc911b9378f1763316200bd363fc
2020-06-03 15:55:03 -07:00

3525 lines
136 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/transactions/transaction_test.h"
#include <algorithm>
#include <atomic>
#include <cinttypes>
#include <functional>
#include <string>
#include <thread>
#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
#include "table/mock_table.h"
#include "test_util/fault_injection_test_env.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "test_util/transaction_test_util.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/string_util.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/transactions/pessimistic_transaction_db.h"
#include "utilities/transactions/write_prepared_txn_db.h"
#include "port/port.h"
using std::string;
namespace ROCKSDB_NAMESPACE {
using CommitEntry = WritePreparedTxnDB::CommitEntry;
using CommitEntry64b = WritePreparedTxnDB::CommitEntry64b;
using CommitEntry64bFormat = WritePreparedTxnDB::CommitEntry64bFormat;
TEST(PreparedHeap, BasicsTest) {
WritePreparedTxnDB::PreparedHeap heap;
{
MutexLock ml(heap.push_pop_mutex());
heap.push(14l);
// Test with one element
ASSERT_EQ(14l, heap.top());
heap.push(24l);
heap.push(34l);
// Test that old min is still on top
ASSERT_EQ(14l, heap.top());
heap.push(44l);
heap.push(54l);
heap.push(64l);
heap.push(74l);
heap.push(84l);
}
// Test that old min is still on top
ASSERT_EQ(14l, heap.top());
heap.erase(24l);
// Test that old min is still on top
ASSERT_EQ(14l, heap.top());
heap.erase(14l);
// Test that the new comes to the top after multiple erase
ASSERT_EQ(34l, heap.top());
heap.erase(34l);
// Test that the new comes to the top after single erase
ASSERT_EQ(44l, heap.top());
heap.erase(54l);
ASSERT_EQ(44l, heap.top());
heap.pop(); // pop 44l
// Test that the erased items are ignored after pop
ASSERT_EQ(64l, heap.top());
heap.erase(44l);
// Test that erasing an already popped item would work
ASSERT_EQ(64l, heap.top());
heap.erase(84l);
ASSERT_EQ(64l, heap.top());
{
MutexLock ml(heap.push_pop_mutex());
heap.push(85l);
heap.push(86l);
heap.push(87l);
heap.push(88l);
heap.push(89l);
}
heap.erase(87l);
heap.erase(85l);
heap.erase(89l);
heap.erase(86l);
heap.erase(88l);
// Test top remains the same after a random order of many erases
ASSERT_EQ(64l, heap.top());
heap.pop();
// Test that pop works with a series of random pending erases
ASSERT_EQ(74l, heap.top());
ASSERT_FALSE(heap.empty());
heap.pop();
// Test that empty works
ASSERT_TRUE(heap.empty());
}
// This is a scenario reconstructed from a buggy trace. Test that the bug does
// not resurface again.
TEST(PreparedHeap, EmptyAtTheEnd) {
WritePreparedTxnDB::PreparedHeap heap;
{
MutexLock ml(heap.push_pop_mutex());
heap.push(40l);
}
ASSERT_EQ(40l, heap.top());
// Although not a recommended scenario, we must be resilient against erase
// without a prior push.
heap.erase(50l);
ASSERT_EQ(40l, heap.top());
{
MutexLock ml(heap.push_pop_mutex());
heap.push(60l);
}
ASSERT_EQ(40l, heap.top());
heap.erase(60l);
ASSERT_EQ(40l, heap.top());
heap.erase(40l);
ASSERT_TRUE(heap.empty());
{
MutexLock ml(heap.push_pop_mutex());
heap.push(40l);
}
ASSERT_EQ(40l, heap.top());
heap.erase(50l);
ASSERT_EQ(40l, heap.top());
{
MutexLock ml(heap.push_pop_mutex());
heap.push(60l);
}
ASSERT_EQ(40l, heap.top());
heap.erase(40l);
// Test that the erase has not emptied the heap (we had a bug doing that)
ASSERT_FALSE(heap.empty());
ASSERT_EQ(60l, heap.top());
heap.erase(60l);
ASSERT_TRUE(heap.empty());
}
// Generate random order of PreparedHeap access and test that the heap will be
// successfully emptied at the end.
TEST(PreparedHeap, Concurrent) {
const size_t t_cnt = 10;
ROCKSDB_NAMESPACE::port::Thread t[t_cnt + 1];
WritePreparedTxnDB::PreparedHeap heap;
port::RWMutex prepared_mutex;
std::atomic<size_t> last;
for (size_t n = 0; n < 100; n++) {
last = 0;
t[0] = ROCKSDB_NAMESPACE::port::Thread([&]() {
Random rnd(1103);
for (size_t seq = 1; seq <= t_cnt; seq++) {
// This is not recommended usage but we should be resilient against it.
bool skip_push = rnd.OneIn(5);
if (!skip_push) {
MutexLock ml(heap.push_pop_mutex());
std::this_thread::yield();
heap.push(seq);
last.store(seq);
}
}
});
for (size_t i = 1; i <= t_cnt; i++) {
t[i] =
ROCKSDB_NAMESPACE::port::Thread([&heap, &prepared_mutex, &last, i]() {
auto seq = i;
do {
std::this_thread::yield();
} while (last.load() < seq);
WriteLock wl(&prepared_mutex);
heap.erase(seq);
});
}
for (size_t i = 0; i <= t_cnt; i++) {
t[i].join();
}
ASSERT_TRUE(heap.empty());
}
}
// Test that WriteBatchWithIndex correctly counts the number of sub-batches
TEST(WriteBatchWithIndex, SubBatchCnt) {
ColumnFamilyOptions cf_options;
std::string cf_name = "two";
DB* db;
Options options;
options.create_if_missing = true;
const std::string dbname = test::PerThreadDBPath("transaction_testdb");
DestroyDB(dbname, options);
ASSERT_OK(DB::Open(options, dbname, &db));
ColumnFamilyHandle* cf_handle = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
WriteOptions write_options;
size_t batch_cnt = 1;
size_t save_points = 0;
std::vector<size_t> batch_cnt_at;
WriteBatchWithIndex batch(db->DefaultColumnFamily()->GetComparator(), 0, true,
0);
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key"), Slice("value"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key2"), Slice("value2"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the keys
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key"), Slice("value3"));
batch_cnt++;
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the 2nd key. It should not be counted duplicate since a
// sub-patch is cut after the last duplicate.
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key2"), Slice("value4"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the keys but in a different cf. It should not be counted as
// duplicate keys
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(cf_handle, Slice("key"), Slice("value5"));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// Test that the number of sub-batches matches what we count with
// SubBatchCounter
std::map<uint32_t, const Comparator*> comparators;
comparators[0] = db->DefaultColumnFamily()->GetComparator();
comparators[cf_handle->GetID()] = cf_handle->GetComparator();
SubBatchCounter counter(comparators);
ASSERT_OK(batch.GetWriteBatch()->Iterate(&counter));
ASSERT_EQ(batch_cnt, counter.BatchCount());
// Test that RollbackToSavePoint will properly resets the number of
// sub-batches
for (size_t i = save_points; i > 0; i--) {
batch.RollbackToSavePoint();
ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
}
// Test the count is right with random batches
{
const size_t TOTAL_KEYS = 20; // 20 ~= 10 to cause a few randoms
Random rnd(1131);
std::string keys[TOTAL_KEYS];
for (size_t k = 0; k < TOTAL_KEYS; k++) {
int len = static_cast<int>(rnd.Uniform(50));
keys[k] = test::RandomKey(&rnd, len);
}
for (size_t i = 0; i < 1000; i++) { // 1000 random batches
WriteBatchWithIndex rndbatch(db->DefaultColumnFamily()->GetComparator(),
0, true, 0);
for (size_t k = 0; k < 10; k++) { // 10 key per batch
size_t ki = static_cast<size_t>(rnd.Uniform(TOTAL_KEYS));
Slice key = Slice(keys[ki]);
std::string buffer;
Slice value = Slice(test::RandomString(&rnd, 16, &buffer));
rndbatch.Put(key, value);
}
SubBatchCounter batch_counter(comparators);
ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
ASSERT_EQ(rndbatch.SubBatchCnt(), batch_counter.BatchCount());
}
}
delete cf_handle;
delete db;
}
TEST(CommitEntry64b, BasicTest) {
const size_t INDEX_BITS = static_cast<size_t>(21);
const size_t INDEX_SIZE = static_cast<size_t>(1ull << INDEX_BITS);
const CommitEntry64bFormat FORMAT(static_cast<size_t>(INDEX_BITS));
// zero-initialized CommitEntry64b should indicate an empty entry
CommitEntry64b empty_entry64b;
uint64_t empty_index = 11ul;
CommitEntry empty_entry;
bool ok = empty_entry64b.Parse(empty_index, &empty_entry, FORMAT);
ASSERT_FALSE(ok);
// the zero entry is reserved for un-initialized entries
const size_t MAX_COMMIT = (1 << FORMAT.COMMIT_BITS) - 1 - 1;
// Samples over the numbers that are covered by that many index bits
std::array<uint64_t, 4> is = {{0, 1, INDEX_SIZE / 2 + 1, INDEX_SIZE - 1}};
// Samples over the numbers that are covered by that many commit bits
std::array<uint64_t, 4> ds = {{0, 1, MAX_COMMIT / 2 + 1, MAX_COMMIT}};
// Iterate over prepare numbers that have i) cover all bits of a sequence
// number, and ii) include some bits that fall into the range of index or
// commit bits
for (uint64_t base = 1; base < kMaxSequenceNumber; base *= 2) {
for (uint64_t i : is) {
for (uint64_t d : ds) {
uint64_t p = base + i + d;
for (uint64_t c : {p, p + d / 2, p + d}) {
uint64_t index = p % INDEX_SIZE;
CommitEntry before(p, c), after;
CommitEntry64b entry64b(before, FORMAT);
ok = entry64b.Parse(index, &after, FORMAT);
ASSERT_TRUE(ok);
if (!(before == after)) {
printf("base %" PRIu64 " i %" PRIu64 " d %" PRIu64 " p %" PRIu64
" c %" PRIu64 " index %" PRIu64 "\n",
base, i, d, p, c, index);
}
ASSERT_EQ(before, after);
}
}
}
}
}
class WritePreparedTxnDBMock : public WritePreparedTxnDB {
public:
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
: WritePreparedTxnDB(db_impl, opt) {}
void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
snapshots_ = snapshots;
}
void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
protected:
const std::vector<SequenceNumber> GetSnapshotListFromDB(
SequenceNumber /* unused */) override {
return snapshots_;
}
private:
std::vector<SequenceNumber> snapshots_;
};
class WritePreparedTransactionTestBase : public TransactionTestBase {
public:
WritePreparedTransactionTestBase(bool use_stackable_db, bool two_write_queue,
TxnDBWritePolicy write_policy,
WriteOrdering write_ordering)
: TransactionTestBase(use_stackable_db, two_write_queue, write_policy,
write_ordering){};
protected:
void UpdateTransactionDBOptions(size_t snapshot_cache_bits,
size_t commit_cache_bits) {
txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
txn_db_options.wp_commit_cache_bits = commit_cache_bits;
}
void UpdateTransactionDBOptions(size_t snapshot_cache_bits) {
txn_db_options.wp_snapshot_cache_bits = snapshot_cache_bits;
}
// If expect_update is set, check if it actually updated old_commit_map_. If
// it did not and yet suggested not to check the next snapshot, do the
// opposite to check if it was not a bad suggestion.
void MaybeUpdateOldCommitMapTestWithNext(uint64_t prepare, uint64_t commit,
uint64_t snapshot,
uint64_t next_snapshot,
bool expect_update) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// reset old_commit_map_empty_ so that its value indicate whether
// old_commit_map_ was updated
wp_db->old_commit_map_empty_ = true;
bool check_next = wp_db->MaybeUpdateOldCommitMap(prepare, commit, snapshot,
snapshot < next_snapshot);
if (expect_update == wp_db->old_commit_map_empty_) {
printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
" next: %" PRIu64 "\n",
prepare, commit, snapshot, next_snapshot);
}
EXPECT_EQ(!expect_update, wp_db->old_commit_map_empty_);
if (!check_next && wp_db->old_commit_map_empty_) {
// do the opposite to make sure it was not a bad suggestion
const bool dont_care_bool = true;
wp_db->MaybeUpdateOldCommitMap(prepare, commit, next_snapshot,
dont_care_bool);
if (!wp_db->old_commit_map_empty_) {
printf("prepare: %" PRIu64 " commit: %" PRIu64 " snapshot: %" PRIu64
" next: %" PRIu64 "\n",
prepare, commit, snapshot, next_snapshot);
}
EXPECT_TRUE(wp_db->old_commit_map_empty_);
}
}
// Test that a CheckAgainstSnapshots thread reading old_snapshots will not
// miss a snapshot because of a concurrent update by UpdateSnapshots that is
// writing new_snapshots. Both threads are broken at two points. The sync
// points to enforce them are specified by a1, a2, b1, and b2. CommitEntry
// entry is expected to be vital for one of the snapshots that is common
// between the old and new list of snapshots.
void SnapshotConcurrentAccessTestInternal(
WritePreparedTxnDB* wp_db,
const std::vector<SequenceNumber>& old_snapshots,
const std::vector<SequenceNumber>& new_snapshots, CommitEntry& entry,
SequenceNumber& version, size_t a1, size_t a2, size_t b1, size_t b2) {
// First reset the snapshot list
const std::vector<SequenceNumber> empty_snapshots;
wp_db->old_commit_map_empty_ = true;
wp_db->UpdateSnapshots(empty_snapshots, ++version);
// Then initialize it with the old_snapshots
wp_db->UpdateSnapshots(old_snapshots, ++version);
// Starting from the first thread, cut each thread at two points
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a1),
"WritePreparedTxnDB::UpdateSnapshots:s:start"},
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b1),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a1)},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(a2),
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b1)},
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(b2),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(a2)},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:end",
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(b2)},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
{
ASSERT_TRUE(wp_db->old_commit_map_empty_);
ROCKSDB_NAMESPACE::port::Thread t1(
[&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
ROCKSDB_NAMESPACE::port::Thread t2(
[&]() { wp_db->CheckAgainstSnapshots(entry); });
t1.join();
t2.join();
ASSERT_FALSE(wp_db->old_commit_map_empty_);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
wp_db->old_commit_map_empty_ = true;
wp_db->UpdateSnapshots(empty_snapshots, ++version);
wp_db->UpdateSnapshots(old_snapshots, ++version);
// Starting from the second thread, cut each thread at two points
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a1),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:start"},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b1),
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a1)},
{"WritePreparedTxnDB::UpdateSnapshots:p:" + std::to_string(a2),
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b1)},
{"WritePreparedTxnDB::CheckAgainstSnapshots:p:" + std::to_string(b2),
"WritePreparedTxnDB::UpdateSnapshots:s:" + std::to_string(a2)},
{"WritePreparedTxnDB::UpdateSnapshots:p:end",
"WritePreparedTxnDB::CheckAgainstSnapshots:s:" + std::to_string(b2)},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
{
ASSERT_TRUE(wp_db->old_commit_map_empty_);
ROCKSDB_NAMESPACE::port::Thread t1(
[&]() { wp_db->UpdateSnapshots(new_snapshots, version); });
ROCKSDB_NAMESPACE::port::Thread t2(
[&]() { wp_db->CheckAgainstSnapshots(entry); });
t1.join();
t2.join();
ASSERT_FALSE(wp_db->old_commit_map_empty_);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
// Verify value of keys.
void VerifyKeys(const std::unordered_map<std::string, std::string>& data,
const Snapshot* snapshot = nullptr) {
std::string value;
ReadOptions read_options;
read_options.snapshot = snapshot;
for (auto& kv : data) {
auto s = db->Get(read_options, kv.first, &value);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
if (kv.second != value) {
printf("key = %s\n", kv.first.c_str());
}
ASSERT_EQ(kv.second, value);
} else {
ASSERT_EQ(kv.second, "NOT_FOUND");
}
// Try with MultiGet API too
std::vector<std::string> values;
auto s_vec = db->MultiGet(read_options, {db->DefaultColumnFamily()},
{kv.first}, &values);
ASSERT_EQ(1, values.size());
ASSERT_EQ(1, s_vec.size());
s = s_vec[0];
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
ASSERT_TRUE(kv.second == values[0]);
} else {
ASSERT_EQ(kv.second, "NOT_FOUND");
}
}
}
// Verify all versions of keys.
void VerifyInternalKeys(const std::vector<KeyVersion>& expected_versions) {
std::vector<KeyVersion> versions;
const size_t kMaxKeys = 100000;
ASSERT_OK(GetAllKeyVersions(db, expected_versions.front().user_key,
expected_versions.back().user_key, kMaxKeys,
&versions));
ASSERT_EQ(expected_versions.size(), versions.size());
for (size_t i = 0; i < versions.size(); i++) {
ASSERT_EQ(expected_versions[i].user_key, versions[i].user_key);
ASSERT_EQ(expected_versions[i].sequence, versions[i].sequence);
ASSERT_EQ(expected_versions[i].type, versions[i].type);
if (versions[i].type != kTypeDeletion &&
versions[i].type != kTypeSingleDeletion) {
ASSERT_EQ(expected_versions[i].value, versions[i].value);
}
// Range delete not supported.
assert(expected_versions[i].type != kTypeRangeDeletion);
}
}
};
class WritePreparedTransactionTest
: public WritePreparedTransactionTestBase,
virtual public ::testing::WithParamInterface<
std::tuple<bool, bool, TxnDBWritePolicy, WriteOrdering>> {
public:
WritePreparedTransactionTest()
: WritePreparedTransactionTestBase(
std::get<0>(GetParam()), std::get<1>(GetParam()),
std::get<2>(GetParam()), std::get<3>(GetParam())){};
};
#ifndef ROCKSDB_VALGRIND_RUN
class SnapshotConcurrentAccessTest
: public WritePreparedTransactionTestBase,
virtual public ::testing::WithParamInterface<std::tuple<
bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> {
public:
SnapshotConcurrentAccessTest()
: WritePreparedTransactionTestBase(
std::get<0>(GetParam()), std::get<1>(GetParam()),
std::get<2>(GetParam()), std::get<3>(GetParam())),
split_id_(std::get<4>(GetParam())),
split_cnt_(std::get<5>(GetParam())){};
protected:
// A test is split into split_cnt_ tests, each identified with split_id_ where
// 0 <= split_id_ < split_cnt_
size_t split_id_;
size_t split_cnt_;
};
#endif // ROCKSDB_VALGRIND_RUN
class SeqAdvanceConcurrentTest
: public WritePreparedTransactionTestBase,
virtual public ::testing::WithParamInterface<std::tuple<
bool, bool, TxnDBWritePolicy, WriteOrdering, size_t, size_t>> {
public:
SeqAdvanceConcurrentTest()
: WritePreparedTransactionTestBase(
std::get<0>(GetParam()), std::get<1>(GetParam()),
std::get<2>(GetParam()), std::get<3>(GetParam())),
split_id_(std::get<4>(GetParam())),
split_cnt_(std::get<5>(GetParam())){};
protected:
// A test is split into split_cnt_ tests, each identified with split_id_ where
// 0 <= split_id_ < split_cnt_
size_t split_id_;
size_t split_cnt_;
};
INSTANTIATE_TEST_CASE_P(
WritePreparedTransaction, WritePreparedTransactionTest,
::testing::Values(
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite)));
#ifndef ROCKSDB_VALGRIND_RUN
INSTANTIATE_TEST_CASE_P(
TwoWriteQueues, SnapshotConcurrentAccessTest,
::testing::Values(
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 10, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 11, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 12, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 13, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 14, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 15, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 16, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 17, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 18, 20),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 19, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 10, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 11, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 12, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 13, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 14, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 15, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 16, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 17, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 18, 20),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 19, 20)));
INSTANTIATE_TEST_CASE_P(
OneWriteQueue, SnapshotConcurrentAccessTest,
::testing::Values(
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 10, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 11, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 12, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 13, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 14, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 15, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 16, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 17, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 18, 20),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 19, 20)));
INSTANTIATE_TEST_CASE_P(
TwoWriteQueues, SeqAdvanceConcurrentTest,
::testing::Values(
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 0, 10),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 1, 10),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 2, 10),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 3, 10),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 4, 10),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 5, 10),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 6, 10),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 7, 10),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 8, 10),
std::make_tuple(false, true, WRITE_PREPARED, kOrderedWrite, 9, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 0, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 1, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 2, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 3, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 4, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 5, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 6, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 7, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 8, 10),
std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite, 9, 10)));
INSTANTIATE_TEST_CASE_P(
OneWriteQueue, SeqAdvanceConcurrentTest,
::testing::Values(
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 0, 10),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 1, 10),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 2, 10),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 3, 10),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 4, 10),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 5, 10),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 6, 10),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 7, 10),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 8, 10),
std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite, 9, 10)));
#endif // ROCKSDB_VALGRIND_RUN
TEST_P(WritePreparedTransactionTest, CommitMap) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
size_t size = wp_db->COMMIT_CACHE_SIZE;
CommitEntry c = {5, 12}, e;
bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
ASSERT_FALSE(evicted);
// Should be able to read the same value
CommitEntry64b dont_care;
bool found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(c, e);
// Should be able to distinguish between overlapping entries
found = wp_db->GetCommitEntry((c.prep_seq + size) % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_NE(c.prep_seq + size, e.prep_seq);
// Should be able to detect non-existent entry
found = wp_db->GetCommitEntry((c.prep_seq + 1) % size, &dont_care, &e);
ASSERT_FALSE(found);
// Reject an invalid exchange
CommitEntry e2 = {c.prep_seq + size, c.commit_seq + size};
CommitEntry64b e2_64b(e2, wp_db->FORMAT);
bool exchanged = wp_db->ExchangeCommitEntry(e2.prep_seq % size, e2_64b, e);
ASSERT_FALSE(exchanged);
// check whether it did actually reject that
found = wp_db->GetCommitEntry(e2.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(c, e);
// Accept a valid exchange
CommitEntry64b c_64b(c, wp_db->FORMAT);
CommitEntry e3 = {c.prep_seq + size, c.commit_seq + size + 1};
exchanged = wp_db->ExchangeCommitEntry(c.prep_seq % size, c_64b, e3);
ASSERT_TRUE(exchanged);
// check whether it did actually accepted that
found = wp_db->GetCommitEntry(c.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(e3, e);
// Rewrite an entry
CommitEntry e4 = {e3.prep_seq + size, e3.commit_seq + size + 1};
evicted = wp_db->AddCommitEntry(e4.prep_seq % size, e4, &e);
ASSERT_TRUE(evicted);
ASSERT_EQ(e3, e);
found = wp_db->GetCommitEntry(e4.prep_seq % size, &dont_care, &e);
ASSERT_TRUE(found);
ASSERT_EQ(e4, e);
}
TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
// If prepare <= snapshot < commit we should keep the entry around since its
// nonexistence could be interpreted as committed in the snapshot while it is
// not true. We keep such entries around by adding them to the
// old_commit_map_.
uint64_t p /*prepare*/, c /*commit*/, s /*snapshot*/, ns /*next_snapshot*/;
p = 10l, c = 15l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
// If we do not expect the old commit map to be updated, try also with a next
// snapshot that is expected to update the old commit map. This would test
// that MaybeUpdateOldCommitMap would not prevent us from checking the next
// snapshot that must be checked.
p = 10l, c = 15l, s = 20l, ns = 11l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 10l, c = 20l, s = 20l, ns = 19l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 10l, c = 20l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 20l, c = 20l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 20l, c = 20l, s = 20l, ns = 19l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 10l, c = 25l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
p = 20l, c = 25l, s = 20l, ns = 21l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, true);
p = 21l, c = 25l, s = 20l, ns = 22l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
p = 21l, c = 25l, s = 20l, ns = 19l;
MaybeUpdateOldCommitMapTestWithNext(p, c, s, ns, false);
}
// Trigger the condition where some old memtables are skipped when doing
// TransactionUtil::CheckKey(), and make sure the result is still correct.
TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
const int kAttemptHistoryMemtable = 0;
const int kAttemptImmMemTable = 1;
for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
attempt++) {
options.max_write_buffer_number_to_maintain = 3;
ReOpen();
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
txn_options.set_snapshot = true;
string value;
Status s;
ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
Transaction* txn = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn != nullptr);
ASSERT_OK(txn->SetName("txn"));
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn2 != nullptr);
ASSERT_OK(txn2->SetName("txn2"));
// This transaction is created to cause potential conflict.
Transaction* txn_x = db->BeginTransaction(write_options);
ASSERT_OK(txn_x->SetName("txn_x"));
ASSERT_OK(txn_x->Put(Slice("foo"), Slice("bar3")));
ASSERT_OK(txn_x->Prepare());
// Create snapshots after the prepare, but there should still
// be a conflict when trying to read "foo".
if (attempt == kAttemptImmMemTable) {
// For the second attempt, hold flush from beginning. The memtable
// will be switched to immutable after calling TEST_SwitchMemtable()
// while CheckKey() is called.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"WritePreparedTransactionTest.CheckKeySkipOldMemtable",
"FlushJob::Start"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
}
// force a memtable flush. The memtable should still be kept
FlushOptions flush_ops;
if (attempt == kAttemptHistoryMemtable) {
ASSERT_OK(db->Flush(flush_ops));
} else {
assert(attempt == kAttemptImmMemTable);
DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB());
db_impl->TEST_SwitchMemtable();
}
uint64_t num_imm_mems;
ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
&num_imm_mems));
if (attempt == kAttemptHistoryMemtable) {
ASSERT_EQ(0, num_imm_mems);
} else {
assert(attempt == kAttemptImmMemTable);
ASSERT_EQ(1, num_imm_mems);
}
// Put something in active memtable
ASSERT_OK(db->Put(write_options, Slice("foo3"), Slice("bar")));
// Create txn3 after flushing, but this transaction also needs to
// check all memtables because of they contains uncommitted data.
Transaction* txn3 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn3 != nullptr);
ASSERT_OK(txn3->SetName("txn3"));
// Commit the pending write
ASSERT_OK(txn_x->Commit());
// Commit txn, txn2 and tx3. txn and tx3 will conflict but txn2 will
// pass. In all cases, both memtables are queried.
SetPerfLevel(PerfLevel::kEnableCount);
get_perf_context()->Reset();
ASSERT_TRUE(txn3->GetForUpdate(read_options, "foo", &value).IsBusy());
// We should have checked two memtables, active and either immutable
// or history memtable, depending on the test case.
ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
get_perf_context()->Reset();
ASSERT_TRUE(txn->GetForUpdate(read_options, "foo", &value).IsBusy());
// We should have checked two memtables, active and either immutable
// or history memtable, depending on the test case.
ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
get_perf_context()->Reset();
ASSERT_OK(txn2->GetForUpdate(read_options, "foo2", &value));
ASSERT_EQ(value, "bar");
// We should have checked two memtables, and since there is no
// conflict, another Get() will be made and fetch the data from
// DB. If it is in immutable memtable, two extra memtable reads
// will be issued. If it is not (in history), only one will
// be made, which is to the active memtable.
if (attempt == kAttemptHistoryMemtable) {
ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
} else {
assert(attempt == kAttemptImmMemTable);
ASSERT_EQ(4, get_perf_context()->get_from_memtable_count);
}
Transaction* txn4 = db->BeginTransaction(write_options, txn_options);
ASSERT_TRUE(txn4 != nullptr);
ASSERT_OK(txn4->SetName("txn4"));
get_perf_context()->Reset();
ASSERT_OK(txn4->GetForUpdate(read_options, "foo", &value));
if (attempt == kAttemptHistoryMemtable) {
// Active memtable will be checked in snapshot validation and when
// getting the value.
ASSERT_EQ(2, get_perf_context()->get_from_memtable_count);
} else {
// Only active memtable will be checked in snapshot validation but
// both of active and immutable snapshot will be queried when
// getting the value.
assert(attempt == kAttemptImmMemTable);
ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
}
ASSERT_OK(txn2->Commit());
ASSERT_OK(txn4->Commit());
TEST_SYNC_POINT("WritePreparedTransactionTest.CheckKeySkipOldMemtable");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
SetPerfLevel(PerfLevel::kDisable);
delete txn;
delete txn2;
delete txn3;
delete txn4;
delete txn_x;
}
}
// Reproduce the bug with two snapshots with the same seuqence number and test
// that the release of the first snapshot will not affect the reads by the other
// snapshot
TEST_P(WritePreparedTransactionTest, DoubleSnapshot) {
TransactionOptions txn_options;
Status s;
// Insert initial value
ASSERT_OK(db->Put(WriteOptions(), "key", "value1"));
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
Transaction* txn =
wp_db->BeginTransaction(WriteOptions(), txn_options, nullptr);
ASSERT_OK(txn->SetName("txn"));
ASSERT_OK(txn->Put("key", "value2"));
ASSERT_OK(txn->Prepare());
// Three snapshots with the same seq number
const Snapshot* snapshot0 = wp_db->GetSnapshot();
const Snapshot* snapshot1 = wp_db->GetSnapshot();
const Snapshot* snapshot2 = wp_db->GetSnapshot();
ASSERT_OK(txn->Commit());
SequenceNumber cache_size = wp_db->COMMIT_CACHE_SIZE;
SequenceNumber overlap_seq = txn->GetId() + cache_size;
delete txn;
// 4th snapshot with a larger seq
const Snapshot* snapshot3 = wp_db->GetSnapshot();
// Cause an eviction to advance max evicted seq number
// This also fetches the 4 snapshots from db since their seq is lower than the
// new max
wp_db->AddCommitted(overlap_seq, overlap_seq);
ReadOptions ropt;
// It should see the value before commit
ropt.snapshot = snapshot2;
PinnableSlice pinnable_val;
s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == "value1");
pinnable_val.Reset();
wp_db->ReleaseSnapshot(snapshot1);
// It should still see the value before commit
s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == "value1");
pinnable_val.Reset();
// Cause an eviction to advance max evicted seq number and trigger updating
// the snapshot list
overlap_seq += cache_size;
wp_db->AddCommitted(overlap_seq, overlap_seq);
// It should still see the value before commit
s = wp_db->Get(ropt, wp_db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_OK(s);
ASSERT_TRUE(pinnable_val == "value1");
pinnable_val.Reset();
wp_db->ReleaseSnapshot(snapshot0);
wp_db->ReleaseSnapshot(snapshot2);
wp_db->ReleaseSnapshot(snapshot3);
}
size_t UniqueCnt(std::vector<SequenceNumber> vec) {
std::set<SequenceNumber> aset;
for (auto i : vec) {
aset.insert(i);
}
return aset.size();
}
// Test that the entries in old_commit_map_ get garbage collected properly
TEST_P(WritePreparedTransactionTest, OldCommitMapGC) {
const size_t snapshot_cache_bits = 0;
const size_t commit_cache_bits = 0;
DBImpl* mock_db = new DBImpl(options, dbname);
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options));
SequenceNumber seq = 0;
// Take the first snapshot that overlaps with two txn
auto prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
auto prep_seq2 = ++seq;
wp_db->AddPrepared(prep_seq2);
auto snap_seq1 = seq;
wp_db->TakeSnapshot(snap_seq1);
auto commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
auto commit_seq2 = ++seq;
wp_db->AddCommitted(prep_seq2, commit_seq2);
wp_db->RemovePrepared(prep_seq2);
// Take the 2nd and 3rd snapshot that overlap with the same txn
prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
auto snap_seq2 = seq;
wp_db->TakeSnapshot(snap_seq2);
seq++;
auto snap_seq3 = seq;
wp_db->TakeSnapshot(snap_seq3);
seq++;
commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
// Make sure max_evicted_seq_ will be larger than 2nd snapshot by evicting the
// only item in the commit_cache_ via another commit.
prep_seq = ++seq;
wp_db->AddPrepared(prep_seq);
commit_seq = ++seq;
wp_db->AddCommitted(prep_seq, commit_seq);
wp_db->RemovePrepared(prep_seq);
// Verify that the evicted commit entries for all snapshots are in the
// old_commit_map_
{
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(3, wp_db->old_commit_map_.size());
ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq2]));
ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
}
// Verify that the 2nd snapshot is cleaned up after the release
wp_db->ReleaseSnapshotInternal(snap_seq2);
{
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(2, wp_db->old_commit_map_.size());
ASSERT_EQ(2, UniqueCnt(wp_db->old_commit_map_[snap_seq1]));
ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
}
// Verify that the 1st snapshot is cleaned up after the release
wp_db->ReleaseSnapshotInternal(snap_seq1);
{
ASSERT_FALSE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(1, wp_db->old_commit_map_.size());
ASSERT_EQ(1, UniqueCnt(wp_db->old_commit_map_[snap_seq3]));
}
// Verify that the 3rd snapshot is cleaned up after the release
wp_db->ReleaseSnapshotInternal(snap_seq3);
{
ASSERT_TRUE(wp_db->old_commit_map_empty_.load());
ReadLock rl(&wp_db->old_commit_map_mutex_);
ASSERT_EQ(0, wp_db->old_commit_map_.size());
}
}
TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) {
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, 500l,
600l, 700l, 800l, 900l};
const size_t snapshot_cache_bits = 2;
const uint64_t cache_size = 1ul << snapshot_cache_bits;
// Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
DBImpl* mock_db = new DBImpl(options, dbname);
UpdateTransactionDBOptions(snapshot_cache_bits);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options));
SequenceNumber version = 1000l;
ASSERT_EQ(0, wp_db->snapshots_total_);
wp_db->UpdateSnapshots(snapshots, version);
ASSERT_EQ(snapshots.size(), wp_db->snapshots_total_);
// seq numbers are chosen so that we have two of them between each two
// snapshots. If the diff of two consecutive seq is more than 5, there is a
// snapshot between them.
std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
355l, 450l, 455l, 550l, 555l, 650l, 655l,
750l, 755l, 850l, 855l, 950l, 955l};
assert(seqs.size() > 1);
for (size_t i = 0; i + 1 < seqs.size(); i++) {
wp_db->old_commit_map_empty_ = true; // reset
CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
wp_db->CheckAgainstSnapshots(commit_entry);
// Expect update if there is snapshot in between the prepare and commit
bool expect_update = commit_entry.commit_seq - commit_entry.prep_seq > 5 &&
commit_entry.commit_seq >= snapshots.front() &&
commit_entry.prep_seq <= snapshots.back();
ASSERT_EQ(expect_update, !wp_db->old_commit_map_empty_);
}
// Test that search will include multiple snapshot from snapshot cache
{
// exclude first and last item in the cache
CommitEntry commit_entry = {snapshots.front() + 1,
snapshots[cache_size - 1] - 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), cache_size - 2);
}
// Test that search will include multiple snapshot from old snapshots
{
// include two in the middle
CommitEntry commit_entry = {snapshots[cache_size] + 1,
snapshots[cache_size + 2] + 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), 2);
}
// Test that search will include both snapshot cache and old snapshots
// Case 1: includes all in snapshot cache
{
CommitEntry commit_entry = {snapshots.front() - 1, snapshots.back() + 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size());
}
// Case 2: includes all snapshot caches except the smallest
{
CommitEntry commit_entry = {snapshots.front() + 1, snapshots.back() + 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - 1);
}
// Case 3: includes only the largest of snapshot cache
{
CommitEntry commit_entry = {snapshots[cache_size - 1] - 1,
snapshots.back() + 1};
wp_db->old_commit_map_empty_ = true; // reset
wp_db->old_commit_map_.clear();
wp_db->CheckAgainstSnapshots(commit_entry);
ASSERT_EQ(wp_db->old_commit_map_.size(), snapshots.size() - cache_size + 1);
}
}
// This test is too slow for travis
#ifndef TRAVIS
#ifndef ROCKSDB_VALGRIND_RUN
// Test that CheckAgainstSnapshots will not miss a live snapshot if it is run in
// parallel with UpdateSnapshots.
TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccess) {
// We have a sync point in the method under test after checking each snapshot.
// If you increase the max number of snapshots in this test, more sync points
// in the methods must also be added.
const std::vector<SequenceNumber> snapshots = {10l, 20l, 30l, 40l, 50l,
60l, 70l, 80l, 90l, 100l};
const size_t snapshot_cache_bits = 2;
// Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size());
SequenceNumber version = 1000l;
// Choose the cache size so that the new snapshot list could replace all the
// existing items in the cache and also have some overflow.
DBImpl* mock_db = new DBImpl(options, dbname);
UpdateTransactionDBOptions(snapshot_cache_bits);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options));
const size_t extra = 2;
size_t loop_id = 0;
// Add up to extra items that do not fit into the cache
for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + extra;
old_size++) {
const std::vector<SequenceNumber> old_snapshots(
snapshots.begin(), snapshots.begin() + old_size);
// Each member of old snapshot might or might not appear in the new list. We
// create a common_snapshots for each combination.
size_t new_comb_cnt = size_t(1) << old_size;
for (size_t new_comb = 0; new_comb < new_comb_cnt; new_comb++, loop_id++) {
if (loop_id % split_cnt_ != split_id_) continue;
printf("."); // To signal progress
fflush(stdout);
std::vector<SequenceNumber> common_snapshots;
for (size_t i = 0; i < old_snapshots.size(); i++) {
if (IsInCombination(i, new_comb)) {
common_snapshots.push_back(old_snapshots[i]);
}
}
// And add some new snapshots to the common list
for (size_t added_snapshots = 0;
added_snapshots <= snapshots.size() - old_snapshots.size();
added_snapshots++) {
std::vector<SequenceNumber> new_snapshots = common_snapshots;
for (size_t i = 0; i < added_snapshots; i++) {
new_snapshots.push_back(snapshots[old_snapshots.size() + i]);
}
for (auto it = common_snapshots.begin(); it != common_snapshots.end();
++it) {
auto snapshot = *it;
// Create a commit entry that is around the snapshot and thus should
// be not be discarded
CommitEntry entry = {static_cast<uint64_t>(snapshot - 1),
snapshot + 1};
// The critical part is when iterating the snapshot cache. Afterwards,
// we are operating under the lock
size_t a_range =
std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
size_t b_range =
std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
// Break each thread at two points
for (size_t a1 = 1; a1 <= a_range; a1++) {
for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
for (size_t b1 = 1; b1 <= b_range; b1++) {
for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
SnapshotConcurrentAccessTestInternal(
wp_db.get(), old_snapshots, new_snapshots, entry, version,
a1, a2, b1, b2);
}
}
}
}
}
}
}
}
printf("\n");
}
#endif // ROCKSDB_VALGRIND_RUN
#endif // TRAVIS
// This test clarifies the contract of AdvanceMaxEvictedSeq method
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasic) {
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options));
// 1. Set the initial values for max, prepared, and snapshots
SequenceNumber zero_max = 0l;
// Set the initial list of prepared txns
const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100,
150, 200, 250};
for (auto p : initial_prepared) {
wp_db->AddPrepared(p);
}
// This updates the max value and also set old prepared
SequenceNumber init_max = 100;
wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
const std::vector<SequenceNumber> initial_snapshots = {20, 40};
wp_db->SetDBSnapshots(initial_snapshots);
// This will update the internal cache of snapshots from the DB
wp_db->UpdateSnapshots(initial_snapshots, init_max);
// 2. Invoke AdvanceMaxEvictedSeq
const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
wp_db->SetDBSnapshots(latest_snapshots);
SequenceNumber new_max = 200;
wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
// 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
// a. max should be updated to new_max
ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
// b. delayed prepared should contain every txn <= max and prepared should
// only contain txns > max
auto it = initial_prepared.begin();
for (; it != initial_prepared.end() && *it <= new_max; ++it) {
ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
++it, wp_db->prepared_txns_.pop()) {
ASSERT_EQ(*it, wp_db->prepared_txns_.top());
}
ASSERT_TRUE(it == initial_prepared.end());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
// c. snapshots should contain everything below new_max
auto sit = latest_snapshots.begin();
for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
i < wp_db->snapshots_total_;
sit++, i++) {
ASSERT_TRUE(i < wp_db->snapshots_total_);
// This test is in small scale and the list of snapshots are assumed to be
// within the cache size limit. This is just a safety check to double check
// that assumption.
ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
}
}
// A new snapshot should always be always larger than max_evicted_seq_
// Otherwise the snapshot does not go through AdvanceMaxEvictedSeq
TEST_P(WritePreparedTransactionTest, NewSnapshotLargerThanMax) {
WriteOptions woptions;
TransactionOptions txn_options;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
Transaction* txn0 = db->BeginTransaction(woptions, txn_options);
ASSERT_OK(txn0->Put(Slice("key"), Slice("value")));
ASSERT_OK(txn0->Commit());
const SequenceNumber seq = txn0->GetId(); // is also prepare seq
delete txn0;
std::vector<Transaction*> txns;
// Inc seq without committing anything
for (int i = 0; i < 10; i++) {
Transaction* txn = db->BeginTransaction(woptions, txn_options);
ASSERT_OK(txn->SetName("xid" + std::to_string(i)));
ASSERT_OK(txn->Put(Slice("key" + std::to_string(i)), Slice("value")));
ASSERT_OK(txn->Prepare());
txns.push_back(txn);
}
// The new commit is seq + 10
ASSERT_OK(db->Put(woptions, "key", "value"));
auto snap = wp_db->GetSnapshot();
const SequenceNumber last_seq = snap->GetSequenceNumber();
wp_db->ReleaseSnapshot(snap);
ASSERT_LT(seq, last_seq);
// Otherwise our test is not effective
ASSERT_LT(last_seq - seq, wp_db->INC_STEP_FOR_MAX_EVICTED);
// Evict seq out of commit cache
const SequenceNumber overwrite_seq = seq + wp_db->COMMIT_CACHE_SIZE;
// Check that the next write could make max go beyond last
auto last_max = wp_db->max_evicted_seq_.load();
wp_db->AddCommitted(overwrite_seq, overwrite_seq);
// Check that eviction has advanced the max
ASSERT_LT(last_max, wp_db->max_evicted_seq_.load());
// Check that the new max has not advanced the last seq
ASSERT_LT(wp_db->max_evicted_seq_.load(), last_seq);
for (auto txn : txns) {
txn->Rollback();
delete txn;
}
}
// A new snapshot should always be always larger than max_evicted_seq_
// In very rare cases max could be below last published seq. Test that
// taking snapshot will wait for max to catch up.
TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WriteOptions woptions;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
const int writes = 50;
const int batch_cnt = 4;
ROCKSDB_NAMESPACE::port::Thread t1([&]() {
for (int i = 0; i < writes; i++) {
WriteBatch batch;
// For duplicate keys cause 4 commit entries, each evicting an entry that
// is not published yet, thus causing max evicted seq go higher than last
// published.
for (int b = 0; b < batch_cnt; b++) {
batch.Put("foo", "foo");
}
db->Write(woptions, &batch);
}
});
ROCKSDB_NAMESPACE::port::Thread t2([&]() {
while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
std::this_thread::yield();
}
for (int i = 0; i < 10; i++) {
SequenceNumber max_lower_bound = wp_db->max_evicted_seq_;
auto snap = db->GetSnapshot();
if (snap->GetSequenceNumber() != 0) {
// Value of max_evicted_seq_ when snapshot was taken in unknown. We thus
// compare with the lower bound instead as an approximation.
ASSERT_LT(max_lower_bound, snap->GetSequenceNumber());
} // seq 0 is ok to be less than max since nothing is visible to it
db->ReleaseSnapshot(snap);
}
});
t1.join();
t2.join();
// Make sure that the test has worked and seq number has advanced as we
// thought
auto snap = db->GetSnapshot();
ASSERT_GT(snap->GetSequenceNumber(), batch_cnt * writes - 1);
db->ReleaseSnapshot(snap);
}
// Test that reads without snapshots would not hit an undefined state
TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WriteOptions woptions;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
const int writes = 50;
ROCKSDB_NAMESPACE::port::Thread t1([&]() {
for (int i = 0; i < writes; i++) {
WriteBatch batch;
batch.Put("key", "foo");
db->Write(woptions, &batch);
}
});
ROCKSDB_NAMESPACE::port::Thread t2([&]() {
while (wp_db->max_evicted_seq_ == 0) { // wait for insert thread
std::this_thread::yield();
}
ReadOptions ropt;
PinnableSlice pinnable_val;
TransactionOptions txn_options;
for (int i = 0; i < 10; i++) {
auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.ok() || s.IsTryAgain());
pinnable_val.Reset();
Transaction* txn = db->BeginTransaction(woptions, txn_options);
s = txn->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.ok() || s.IsTryAgain());
pinnable_val.Reset();
std::vector<std::string> values;
auto s_vec =
txn->MultiGet(ropt, {db->DefaultColumnFamily()}, {"key"}, &values);
ASSERT_EQ(1, values.size());
ASSERT_EQ(1, s_vec.size());
s = s_vec[0];
ASSERT_TRUE(s.ok() || s.IsTryAgain());
Slice key("key");
txn->MultiGet(ropt, db->DefaultColumnFamily(), 1, &key, &pinnable_val, &s,
true);
ASSERT_TRUE(s.ok() || s.IsTryAgain());
delete txn;
}
});
t1.join();
t2.join();
// Make sure that the test has worked and seq number has advanced as we
// thought
auto snap = db->GetSnapshot();
ASSERT_GT(snap->GetSequenceNumber(), writes - 1);
db->ReleaseSnapshot(snap);
}
// Check that old_commit_map_ cleanup works correctly if the snapshot equals
// max_evicted_seq_.
TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WriteOptions woptions;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Insert something to increase seq
ASSERT_OK(db->Put(woptions, "key", "value"));
auto snap = db->GetSnapshot();
auto snap_seq = snap->GetSequenceNumber();
// Another insert should trigger eviction + load snapshot from db
ASSERT_OK(db->Put(woptions, "key", "value"));
// This is the scenario that we check agaisnt
ASSERT_EQ(snap_seq, wp_db->max_evicted_seq_);
// old_commit_map_ now has some data that needs gc
ASSERT_EQ(1, wp_db->snapshots_total_);
ASSERT_EQ(1, wp_db->old_commit_map_.size());
db->ReleaseSnapshot(snap);
// Another insert should trigger eviction + load snapshot from db
ASSERT_OK(db->Put(woptions, "key", "value"));
// the snapshot and related metadata must be properly garbage collected
ASSERT_EQ(0, wp_db->snapshots_total_);
ASSERT_TRUE(wp_db->snapshots_all_.empty());
ASSERT_EQ(0, wp_db->old_commit_map_.size());
}
TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) {
auto snap = db->GetSnapshot();
auto seq1 = snap->GetSequenceNumber();
db->ReleaseSnapshot(snap);
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->AdvanceSeqByOne();
snap = db->GetSnapshot();
auto seq2 = snap->GetSequenceNumber();
db->ReleaseSnapshot(snap);
ASSERT_LT(seq1, seq2);
}
// Test that the txn Initilize calls the overridden functions
TEST_P(WritePreparedTransactionTest, TxnInitialize) {
TransactionOptions txn_options;
WriteOptions write_options;
ASSERT_OK(db->Put(write_options, "key", "value"));
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid"));
ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
ASSERT_OK(txn0->Prepare());
// SetSnapshot is overridden to update min_uncommitted_
txn_options.set_snapshot = true;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
auto snap = txn1->GetSnapshot();
auto snap_impl = reinterpret_cast<const SnapshotImpl*>(snap);
// If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be
// udpated
ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
txn0->Rollback();
txn1->Rollback();
delete txn0;
delete txn1;
}
// This tests that transactions with duplicate keys perform correctly after max
// is advancing their prepared sequence numbers. This will not be the case if
// for example the txn does not add the prepared seq for the second sub-batch to
// the PreparedHeap structure.
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // disable commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ReadOptions ropt;
PinnableSlice pinnable_val;
WriteOptions write_options;
TransactionOptions txn_options;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid"));
ASSERT_OK(txn0->Put(Slice("key"), Slice("value1")));
ASSERT_OK(txn0->Put(Slice("key"), Slice("value2")));
ASSERT_OK(txn0->Prepare());
ASSERT_OK(db->Put(write_options, "key2", "value"));
// Will cause max advance due to disabled commit cache
ASSERT_OK(db->Put(write_options, "key3", "value"));
auto s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
delete txn0;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
txn0 = db->GetTransactionByName("xid");
ASSERT_OK(txn0->Rollback());
delete txn0;
}
#ifndef ROCKSDB_VALGRIND_RUN
// Stress SmallestUnCommittedSeq, which reads from both prepared_txns_ and
// delayed_prepared_, when is run concurrently with advancing max_evicted_seq,
// which moves prepared txns from prepared_txns_ to delayed_prepared_.
TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // disable commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ReadOptions ropt;
PinnableSlice pinnable_val;
WriteOptions write_options;
TransactionOptions txn_options;
std::vector<Transaction*> txns, committed_txns;
const int cnt = 100;
for (int i = 0; i < cnt; i++) {
Transaction* txn = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn->SetName("xid" + ToString(i)));
auto key = "key1" + ToString(i);
auto value = "value1" + ToString(i);
ASSERT_OK(txn->Put(Slice(key), Slice(value)));
ASSERT_OK(txn->Prepare());
txns.push_back(txn);
}
port::Mutex mutex;
Random rnd(1103);
ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
for (int i = 0; i < cnt; i++) {
uint32_t index = rnd.Uniform(cnt - i);
Transaction* txn;
{
MutexLock l(&mutex);
txn = txns[index];
txns.erase(txns.begin() + index);
}
// Since commit cache is practically disabled, commit results in immediate
// advance in max_evicted_seq_ and subsequently moving some prepared txns
// to delayed_prepared_.
txn->Commit();
committed_txns.push_back(txn);
}
});
ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
while (1) {
MutexLock l(&mutex);
if (txns.empty()) {
break;
}
auto min_uncommitted = wp_db->SmallestUnCommittedSeq();
ASSERT_LE(min_uncommitted, (*txns.begin())->GetId());
}
});
commit_thread.join();
read_thread.join();
for (auto txn : committed_txns) {
delete txn;
}
}
#endif // ROCKSDB_VALGRIND_RUN
TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
// Given the sequential run of txns, with this timeout we should never see a
// deadlock nor a timeout unless we have a key conflict, which should be
// almost infeasible.
txn_db_options.transaction_lock_timeout = 1000;
txn_db_options.default_lock_timeout = 1000;
ReOpen();
FlushOptions fopt;
// Number of different txn types we use in this test
const size_t type_cnt = 5;
// The size of the first write group
// TODO(myabandeh): This should be increase for pre-release tests
const size_t first_group_size = 2;
// Total number of txns we run in each test
// TODO(myabandeh): This should be increase for pre-release tests
const size_t txn_cnt = first_group_size + 1;
size_t base[txn_cnt + 1] = {
1,
};
for (size_t bi = 1; bi <= txn_cnt; bi++) {
base[bi] = base[bi - 1] * type_cnt;
}
const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
for (size_t n = 0; n < max_n; n++, ReOpen()) {
if (n % split_cnt_ != split_id_) continue;
if (n % 1000 == 0) {
printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
}
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
auto seq = db_impl->TEST_GetLastVisibleSequence();
with_empty_commits = 0;
exp_seq = seq;
// This is increased before writing the batch for commit
commit_writes = 0;
// This is increased before txn starts linking if it expects to do a commit
// eventually
expected_commits = 0;
std::vector<port::Thread> threads;
linked = 0;
std::atomic<bool> batch_formed(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::EnterAsBatchGroupLeader:End",
[&](void* /*arg*/) { batch_formed = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
linked++;
if (linked == 1) {
// Wait until the others are linked too.
while (linked < first_group_size) {
}
} else if (linked == 1 + first_group_size) {
// Make the 2nd batch of the rest of writes plus any followup
// commits from the first batch
while (linked < txn_cnt + commit_writes) {
}
}
// Then we will have one or more batches consisting of follow-up
// commits from the 2nd batch. There is a bit of non-determinism here
// but it should be tolerable.
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
for (size_t bi = 0; bi < txn_cnt; bi++) {
// get the bi-th digit in number system based on type_cnt
size_t d = (n % base[bi + 1]) / base[bi];
switch (d) {
case 0:
threads.emplace_back(txn_t0, bi);
break;
case 1:
threads.emplace_back(txn_t1, bi);
break;
case 2:
threads.emplace_back(txn_t2, bi);
break;
case 3:
threads.emplace_back(txn_t3, bi);
break;
case 4:
threads.emplace_back(txn_t3, bi);
break;
default:
assert(false);
}
// wait to be linked
while (linked.load() <= bi) {
}
// after a queue of size first_group_size
if (bi + 1 == first_group_size) {
while (!batch_formed) {
}
// to make it more deterministic, wait until the commits are linked
while (linked.load() <= bi + expected_commits) {
}
}
}
for (auto& t : threads) {
t.join();
}
if (options.two_write_queues) {
// In this case none of the above scheduling tricks to deterministically
// form merged batches works because the writes go to separate queues.
// This would result in different write groups in each run of the test. We
// still keep the test since although non-deterministic and hard to debug,
// it is still useful to have.
// TODO(myabandeh): Add a deterministic unit test for two_write_queues
}
// Check if memtable inserts advanced seq number as expected
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_EQ(exp_seq, seq);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
// Check if recovery preserves the last sequence number
db_impl->FlushWAL(true);
ReOpenNoDelete();
assert(db != nullptr);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_LE(exp_seq, seq + with_empty_commits);
// Check if flush preserves the last sequence number
db_impl->Flush(fopt);
seq = db_impl->GetLatestSequenceNumber();
ASSERT_LE(exp_seq, seq + with_empty_commits);
// Check if recovery after flush preserves the last sequence number
db_impl->FlushWAL(true);
ReOpenNoDelete();
assert(db != nullptr);
db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_LE(exp_seq, seq + with_empty_commits);
}
}
// Run a couple of different txns among them some uncommitted. Restart the db at
// a couple points to check whether the list of uncommitted txns are recovered
// properly.
TEST_P(WritePreparedTransactionTest, BasicRecovery) {
options.disable_auto_compactions = true;
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
txn_t0(0);
TransactionOptions txn_options;
WriteOptions write_options;
size_t index = 1000;
Transaction* txn0 = db->BeginTransaction(write_options, txn_options);
auto istr0 = std::to_string(index);
auto s = txn0->SetName("xid" + istr0);
ASSERT_OK(s);
s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
ASSERT_OK(s);
s = txn0->Prepare();
auto prep_seq_0 = txn0->GetId();
txn_t1(0);
index++;
Transaction* txn1 = db->BeginTransaction(write_options, txn_options);
auto istr1 = std::to_string(index);
s = txn1->SetName("xid" + istr1);
ASSERT_OK(s);
s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
ASSERT_OK(s);
s = txn1->Prepare();
auto prep_seq_1 = txn1->GetId();
txn_t2(0);
ReadOptions ropt;
PinnableSlice pinnable_val;
// Check the value is not committed before restart
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
pinnable_val.Reset();
delete txn0;
delete txn1;
wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// After recovery, all the uncommitted txns (0 and 1) should be inserted into
// delayed_prepared_
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
ASSERT_LE(prep_seq_1, wp_db->max_evicted_seq_);
{
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_EQ(2, wp_db->delayed_prepared_.size());
ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_0) !=
wp_db->delayed_prepared_.end());
ASSERT_TRUE(wp_db->delayed_prepared_.find(prep_seq_1) !=
wp_db->delayed_prepared_.end());
}
// Check the value is still not committed after restart
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
pinnable_val.Reset();
txn_t3(0);
// Test that a recovered txns will be properly marked committed for the next
// recovery
txn1 = db->GetTransactionByName("xid" + istr1);
ASSERT_NE(txn1, nullptr);
txn1->Commit();
delete txn1;
index++;
Transaction* txn2 = db->BeginTransaction(write_options, txn_options);
auto istr2 = std::to_string(index);
s = txn2->SetName("xid" + istr2);
ASSERT_OK(s);
s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
ASSERT_OK(s);
s = txn2->Prepare();
auto prep_seq_2 = txn2->GetId();
delete txn2;
wp_db->db_impl_->FlushWAL(true);
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
// 0 and 2 are prepared and 1 is committed
{
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_EQ(2, wp_db->delayed_prepared_.size());
const auto& end = wp_db->delayed_prepared_.end();
ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_0), end);
ASSERT_EQ(wp_db->delayed_prepared_.find(prep_seq_1), end);
ASSERT_NE(wp_db->delayed_prepared_.find(prep_seq_2), end);
}
ASSERT_LE(prep_seq_0, wp_db->max_evicted_seq_);
ASSERT_LE(prep_seq_2, wp_db->max_evicted_seq_);
// Commit all the remaining txns
txn0 = db->GetTransactionByName("xid" + istr0);
ASSERT_NE(txn0, nullptr);
txn0->Commit();
txn2 = db->GetTransactionByName("xid" + istr2);
ASSERT_NE(txn2, nullptr);
txn2->Commit();
// Check the value is committed after commit
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
pinnable_val.Reset();
delete txn0;
delete txn2;
wp_db->db_impl_->FlushWAL(true);
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_TRUE(wp_db->delayed_prepared_empty_);
// Check the value is still committed after recovery
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
ASSERT_TRUE(s.ok());
ASSERT_TRUE(pinnable_val == ("bar0" + istr0));
pinnable_val.Reset();
}
// After recovery the commit map is empty while the max is set. The code would
// go through a different path which requires a separate test. Test that the
// committed data before the restart is visible to all snapshots.
TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) {
for (bool end_with_prepare : {false, true}) {
ReOpen();
WriteOptions woptions;
ASSERT_OK(db->Put(woptions, "key", "value"));
ASSERT_OK(db->Put(woptions, "key", "value"));
ASSERT_OK(db->Put(woptions, "key", "value"));
SequenceNumber prepare_seq = kMaxSequenceNumber;
if (end_with_prepare) {
TransactionOptions txn_options;
Transaction* txn = db->BeginTransaction(woptions, txn_options);
ASSERT_OK(txn->SetName("xid0"));
ASSERT_OK(txn->Prepare());
prepare_seq = txn->GetId();
delete txn;
}
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
ReOpenNoDelete();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db != nullptr);
ASSERT_GT(wp_db->max_evicted_seq_, 0); // max after recovery
// Take a snapshot right after recovery
const Snapshot* snap = db->GetSnapshot();
auto snap_seq = snap->GetSequenceNumber();
ASSERT_GT(snap_seq, 0);
for (SequenceNumber seq = 0;
seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
}
if (end_with_prepare) {
ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
}
// trivial check
ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
db->ReleaseSnapshot(snap);
ASSERT_OK(db->Put(woptions, "key", "value"));
// Take a snapshot after some writes
snap = db->GetSnapshot();
snap_seq = snap->GetSequenceNumber();
for (SequenceNumber seq = 0;
seq <= wp_db->max_evicted_seq_ && seq != prepare_seq; seq++) {
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq));
}
if (end_with_prepare) {
ASSERT_FALSE(wp_db->IsInSnapshot(prepare_seq, snap_seq));
}
// trivial check
ASSERT_FALSE(wp_db->IsInSnapshot(snap_seq + 1, snap_seq));
db->ReleaseSnapshot(snap);
}
}
// Shows the contract of IsInSnapshot when called on invalid/released snapshots
TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
WriteOptions woptions;
ASSERT_OK(db->Put(woptions, "key", "value"));
// snap seq = 1
const Snapshot* snap1 = db->GetSnapshot();
ASSERT_OK(db->Put(woptions, "key", "value"));
ASSERT_OK(db->Put(woptions, "key", "value"));
// snap seq = 3
const Snapshot* snap2 = db->GetSnapshot();
const SequenceNumber seq = 1;
// Evict seq out of commit cache
size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
wp_db->AddCommitted(overwrite_seq, overwrite_seq);
SequenceNumber snap_seq;
uint64_t min_uncommitted = kMinUnCommittedSeq;
bool released;
released = false;
snap_seq = snap1->GetSequenceNumber();
ASSERT_LE(seq, snap_seq);
// Valid snapshot lower than max
ASSERT_LE(snap_seq, wp_db->max_evicted_seq_);
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
ASSERT_FALSE(released);
released = false;
snap_seq = snap1->GetSequenceNumber();
// Invaid snapshot lower than max
ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_);
ASSERT_TRUE(
wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
ASSERT_TRUE(released);
db->ReleaseSnapshot(snap1);
released = false;
// Released snapshot lower than max
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
// The release does not take affect until the next max advance
ASSERT_FALSE(released);
released = false;
// Invaid snapshot lower than max
ASSERT_TRUE(
wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
ASSERT_TRUE(released);
// This make the snapshot release to reflect in txn db structures
wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_,
wp_db->max_evicted_seq_ + 1);
released = false;
// Released snapshot lower than max
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
ASSERT_TRUE(released);
released = false;
// Invaid snapshot lower than max
ASSERT_TRUE(
wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
ASSERT_TRUE(released);
snap_seq = snap2->GetSequenceNumber();
released = false;
// Unreleased snapshot lower than max
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
ASSERT_FALSE(released);
db->ReleaseSnapshot(snap2);
}
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of
// snapshot, max_committed_seq_, prepared, and commit entries.
TEST_P(WritePreparedTransactionTest, IsInSnapshot) {
WriteOptions wo;
// Use small commit cache to trigger lots of eviction and fast advance of
// max_evicted_seq_
const size_t commit_cache_bits = 3;
// Same for snapshot cache size
const size_t snapshot_cache_bits = 2;
// Take some preliminary snapshots first. This is to stress the data structure
// that holds the old snapshots as it will be designed to be efficient when
// only a few snapshots are below the max_evicted_seq_.
for (int max_snapshots = 1; max_snapshots < 20; max_snapshots++) {
// Leave some gap between the preliminary snapshots and the final snapshot
// that we check. This should test for also different overlapping scenarios
// between the last snapshot and the commits.
for (int max_gap = 1; max_gap < 10; max_gap++) {
// Since we do not actually write to db, we mock the seq as it would be
// increased by the db. The only exception is that we need db seq to
// advance for our snapshots. for which we apply a dummy put each time we
// increase our mock of seq.
uint64_t seq = 0;
// At each step we prepare a txn and then we commit it in the next txn.
// This emulates the consecutive transactions that write to the same key
uint64_t cur_txn = 0;
// Number of snapshots taken so far
int num_snapshots = 0;
// Number of gaps applied so far
int gap_cnt = 0;
// The final snapshot that we will inspect
uint64_t snapshot = 0;
bool found_committed = false;
// To stress the data structure that maintain prepared txns, at each cycle
// we add a new prepare txn. These do not mean to be committed for
// snapshot inspection.
std::set<uint64_t> prepared;
// We keep the list of txns committed before we take the last snapshot.
// These should be the only seq numbers that will be found in the snapshot
std::set<uint64_t> committed_before;
// The set of commit seq numbers to be excluded from IsInSnapshot queries
std::set<uint64_t> commit_seqs;
DBImpl* mock_db = new DBImpl(options, dbname);
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options));
// We continue until max advances a bit beyond the snapshot.
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
// do prepare for a transaction
seq++;
wp_db->AddPrepared(seq);
prepared.insert(seq);
// If cur_txn is not started, do prepare for it.
if (!cur_txn) {
seq++;
cur_txn = seq;
wp_db->AddPrepared(cur_txn);
} else { // else commit it
seq++;
wp_db->AddCommitted(cur_txn, seq);
wp_db->RemovePrepared(cur_txn);
commit_seqs.insert(seq);
if (!snapshot) {
committed_before.insert(cur_txn);
}
cur_txn = 0;
}
if (num_snapshots < max_snapshots - 1) {
// Take preliminary snapshots
wp_db->TakeSnapshot(seq);
num_snapshots++;
} else if (gap_cnt < max_gap) {
// Wait for some gap before taking the final snapshot
gap_cnt++;
} else if (!snapshot) {
// Take the final snapshot if it is not already taken
snapshot = seq;
wp_db->TakeSnapshot(snapshot);
num_snapshots++;
}
// If the snapshot is taken, verify seq numbers visible to it. We redo
// it at each cycle to test that the system is still sound when
// max_evicted_seq_ advances.
if (snapshot) {
for (uint64_t s = 1;
s <= seq && commit_seqs.find(s) == commit_seqs.end(); s++) {
bool was_committed =
(committed_before.find(s) != committed_before.end());
bool is_in_snapshot = wp_db->IsInSnapshot(s, snapshot);
if (was_committed != is_in_snapshot) {
printf("max_snapshots %d max_gap %d seq %" PRIu64 " max %" PRIu64
" snapshot %" PRIu64
" gap_cnt %d num_snapshots %d s %" PRIu64 "\n",
max_snapshots, max_gap, seq,
wp_db->max_evicted_seq_.load(), snapshot, gap_cnt,
num_snapshots, s);
}
ASSERT_EQ(was_committed, is_in_snapshot);
found_committed = found_committed || is_in_snapshot;
}
}
}
// Safety check to make sure the test actually ran
ASSERT_TRUE(found_committed);
// As an extra check, check if prepared set will be properly empty after
// they are committed.
if (cur_txn) {
wp_db->AddCommitted(cur_txn, seq);
wp_db->RemovePrepared(cur_txn);
}
for (auto p : prepared) {
wp_db->AddCommitted(p, seq);
wp_db->RemovePrepared(p);
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
}
}
}
void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
PinnableSlice& exp_v, Slice key) {
Status s;
PinnableSlice v;
s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
ASSERT_TRUE(exp_s == s);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
ASSERT_TRUE(exp_v == v);
}
// Try with MultiGet API too
std::vector<std::string> values;
auto s_vec =
db->MultiGet(roptions, {db->DefaultColumnFamily()}, {key}, &values);
ASSERT_EQ(1, values.size());
ASSERT_EQ(1, s_vec.size());
s = s_vec[0];
ASSERT_TRUE(exp_s == s);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
ASSERT_TRUE(exp_v == values[0]);
}
}
void ASSERT_SAME(TransactionDB* db, Status exp_s, PinnableSlice& exp_v,
Slice key) {
ASSERT_SAME(ReadOptions(), db, exp_s, exp_v, key);
}
TEST_P(WritePreparedTransactionTest, Rollback) {
ReadOptions roptions;
WriteOptions woptions;
TransactionOptions txn_options;
const size_t num_keys = 4;
const size_t num_values = 5;
for (size_t ikey = 1; ikey <= num_keys; ikey++) {
for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
for (bool crash : {false, true}) {
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
std::string key_str = "key" + ToString(ikey);
switch (ivalue) {
case 0:
break;
case 1:
ASSERT_OK(db->Put(woptions, key_str, "initvalue1"));
break;
case 2:
ASSERT_OK(db->Merge(woptions, key_str, "initvalue2"));
break;
case 3:
ASSERT_OK(db->Delete(woptions, key_str));
break;
case 4:
ASSERT_OK(db->SingleDelete(woptions, key_str));
break;
default:
assert(0);
}
PinnableSlice v1;
auto s1 =
db->Get(roptions, db->DefaultColumnFamily(), Slice("key1"), &v1);
PinnableSlice v2;
auto s2 =
db->Get(roptions, db->DefaultColumnFamily(), Slice("key2"), &v2);
PinnableSlice v3;
auto s3 =
db->Get(roptions, db->DefaultColumnFamily(), Slice("key3"), &v3);
PinnableSlice v4;
auto s4 =
db->Get(roptions, db->DefaultColumnFamily(), Slice("key4"), &v4);
Transaction* txn = db->BeginTransaction(woptions, txn_options);
auto s = txn->SetName("xid0");
ASSERT_OK(s);
s = txn->Put(Slice("key1"), Slice("value1"));
ASSERT_OK(s);
s = txn->Merge(Slice("key2"), Slice("value2"));
ASSERT_OK(s);
s = txn->Delete(Slice("key3"));
ASSERT_OK(s);
s = txn->SingleDelete(Slice("key4"));
ASSERT_OK(s);
s = txn->Prepare();
ASSERT_OK(s);
{
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_FALSE(wp_db->prepared_txns_.empty());
ASSERT_EQ(txn->GetId(), wp_db->prepared_txns_.top());
}
ASSERT_SAME(db, s1, v1, "key1");
ASSERT_SAME(db, s2, v2, "key2");
ASSERT_SAME(db, s3, v3, "key3");
ASSERT_SAME(db, s4, v4, "key4");
if (crash) {
delete txn;
auto db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
txn = db->GetTransactionByName("xid0");
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_FALSE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->delayed_prepared_.find(txn->GetId()) !=
wp_db->delayed_prepared_.end());
}
ASSERT_SAME(db, s1, v1, "key1");
ASSERT_SAME(db, s2, v2, "key2");
ASSERT_SAME(db, s3, v3, "key3");
ASSERT_SAME(db, s4, v4, "key4");
s = txn->Rollback();
ASSERT_OK(s);
{
ASSERT_TRUE(wp_db->delayed_prepared_empty_);
ReadLock rl(&wp_db->prepared_mutex_);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
}
ASSERT_SAME(db, s1, v1, "key1");
ASSERT_SAME(db, s2, v2, "key2");
ASSERT_SAME(db, s3, v3, "key3");
ASSERT_SAME(db, s4, v4, "key4");
delete txn;
}
}
}
}
TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) {
// Use large buffer to avoid memtable flush after 1024 insertions
options.write_buffer_size = 1024 * 1024;
ReOpen();
std::vector<KeyVersion> versions;
uint64_t seq = 0;
for (uint64_t i = 1; i <= 1024; i++) {
std::string v = "bar" + ToString(i);
ASSERT_OK(db->Put(WriteOptions(), "foo", v));
VerifyKeys({{"foo", v}});
seq++; // one for the key/value
KeyVersion kv = {"foo", v, seq, kTypeValue};
if (options.two_write_queues) {
seq++; // one for the commit
}
versions.emplace_back(kv);
}
std::reverse(std::begin(versions), std::end(versions));
VerifyInternalKeys(versions);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
db_impl->FlushWAL(true);
// Use small buffer to ensure memtable flush during recovery
options.write_buffer_size = 1024;
ReOpenNoDelete();
VerifyInternalKeys(versions);
}
TEST_P(WritePreparedTransactionTest, SequenceNumberZero) {
ASSERT_OK(db->Put(WriteOptions(), "foo", "bar"));
VerifyKeys({{"foo", "bar"}});
const Snapshot* snapshot = db->GetSnapshot();
ASSERT_OK(db->Flush(FlushOptions()));
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Compaction will output keys with sequence number 0, if it is visible to
// earliest snapshot. Make sure IsInSnapshot() report sequence number 0 is
// visible to any snapshot.
VerifyKeys({{"foo", "bar"}});
VerifyKeys({{"foo", "bar"}}, snapshot);
VerifyInternalKeys({{"foo", "bar", 0, kTypeValue}});
db->ReleaseSnapshot(snapshot);
}
// Compaction should not remove a key if it is not committed, and should
// proceed with older versions of the key as-if the new version doesn't exist.
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
options.disable_auto_compactions = true;
ReOpen();
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
// Snapshots to avoid keys get evicted.
std::vector<const Snapshot*> snapshots;
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto add_key = [&](std::function<Status()> func) {
ASSERT_OK(func());
expected_seq++;
if (options.two_write_queues) {
expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
snapshots.push_back(db->GetSnapshot());
};
// Each key here represent a standalone test case.
add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
ASSERT_OK(db->Flush(FlushOptions()));
add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
auto* transaction = db->BeginTransaction(WriteOptions());
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("key1", "value1_2"));
ASSERT_OK(transaction->Delete("key2"));
ASSERT_OK(transaction->SingleDelete("key3"));
ASSERT_OK(transaction->Merge("key4", "value4_2"));
ASSERT_OK(transaction->Merge("key5", "value5_3"));
ASSERT_OK(transaction->Put("key6", "value6_2"));
ASSERT_OK(transaction->Put("key7", "value7_2"));
// Prepare but not commit.
ASSERT_OK(transaction->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
ASSERT_OK(db->Flush(FlushOptions()));
for (auto* s : snapshots) {
db->ReleaseSnapshot(s);
}
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyKeys({
{"key1", "value1_1"},
{"key2", "value2_1"},
{"key3", "value3_1"},
{"key4", "value4_1"},
{"key5", "value5_1,value5_2"},
{"key6", "NOT_FOUND"},
{"key7", "NOT_FOUND"},
});
VerifyInternalKeys({
{"key1", "value1_2", expected_seq, kTypeValue},
{"key1", "value1_1", 0, kTypeValue},
{"key2", "", expected_seq, kTypeDeletion},
{"key2", "value2_1", 0, kTypeValue},
{"key3", "", expected_seq, kTypeSingleDeletion},
{"key3", "value3_1", 0, kTypeValue},
{"key4", "value4_2", expected_seq, kTypeMerge},
{"key4", "value4_1", 0, kTypeValue},
{"key5", "value5_3", expected_seq, kTypeMerge},
{"key5", "value5_1,value5_2", 0, kTypeValue},
{"key6", "value6_2", expected_seq, kTypeValue},
{"key7", "value7_2", expected_seq, kTypeValue},
});
ASSERT_OK(transaction->Commit());
VerifyKeys({
{"key1", "value1_2"},
{"key2", "NOT_FOUND"},
{"key3", "NOT_FOUND"},
{"key4", "value4_1,value4_2"},
{"key5", "value5_1,value5_2,value5_3"},
{"key6", "value6_2"},
{"key7", "value7_2"},
});
delete transaction;
}
// Compaction should keep keys visible to a snapshot based on commit sequence,
// not just prepare sequence.
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
options.disable_auto_compactions = true;
ReOpen();
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto* txn1 = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Put("key1", "value1_1"));
ASSERT_OK(txn1->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
ASSERT_OK(txn1->Commit());
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
delete txn1;
// Take a snapshots to avoid keys get evicted before compaction.
const Snapshot* snapshot1 = db->GetSnapshot();
auto* txn2 = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn2->SetName("txn2"));
ASSERT_OK(txn2->Put("key2", "value2_1"));
ASSERT_OK(txn2->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
// txn1 commit before snapshot2 and it is visible to snapshot2.
// txn2 commit after snapshot2 and it is not visible.
const Snapshot* snapshot2 = db->GetSnapshot();
ASSERT_OK(txn2->Commit());
ASSERT_EQ(++expected_seq, db_impl->TEST_GetLastVisibleSequence());
delete txn2;
// Take a snapshots to avoid keys get evicted before compaction.
const Snapshot* snapshot3 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
expected_seq++; // 1 for write
SequenceNumber seq1 = expected_seq;
if (options.two_write_queues) {
expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
expected_seq++; // 1 for write
SequenceNumber seq2 = expected_seq;
if (options.two_write_queues) {
expected_seq++; // 1 for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Flush(FlushOptions()));
db->ReleaseSnapshot(snapshot1);
db->ReleaseSnapshot(snapshot3);
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
VerifyInternalKeys({
{"key1", "value1_2", seq1, kTypeValue},
// "value1_1" is visible to snapshot2. Also keys at bottom level visible
// to earliest snapshot will output with seq = 0.
{"key1", "value1_1", 0, kTypeValue},
{"key2", "value2_2", seq2, kTypeValue},
});
db->ReleaseSnapshot(snapshot2);
}
TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // disable commit cache
for (bool has_recent_prepare : {true, false}) {
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* transaction =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Delete("key1"));
ASSERT_OK(transaction->Prepare());
// snapshot1 should get min_uncommitted from prepared_txns_ heap.
auto snapshot1 = db->GetSnapshot();
ASSERT_EQ(transaction->GetId(),
((SnapshotImpl*)snapshot1)->min_uncommitted_);
// Add a commit to advance max_evicted_seq and move the prepared transaction
// into delayed_prepared_ set.
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
Transaction* txn2 = nullptr;
if (has_recent_prepare) {
txn2 =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(txn2->SetName("txn2"));
ASSERT_OK(txn2->Put("key3", "value3"));
ASSERT_OK(txn2->Prepare());
}
// snapshot2 should get min_uncommitted from delayed_prepared_ set.
auto snapshot2 = db->GetSnapshot();
ASSERT_EQ(transaction->GetId(),
((SnapshotImpl*)snapshot1)->min_uncommitted_);
ASSERT_OK(transaction->Commit());
delete transaction;
if (has_recent_prepare) {
ASSERT_OK(txn2->Commit());
delete txn2;
}
VerifyKeys({{"key1", "NOT_FOUND"}});
VerifyKeys({{"key1", "value1"}}, snapshot1);
VerifyKeys({{"key1", "value1"}}, snapshot2);
db->ReleaseSnapshot(snapshot1);
db->ReleaseSnapshot(snapshot2);
}
}
// Insert two values, v1 and v2, for a key. Between prepare and commit of v2
// take two snapshots, s1 and s2. Release s1 during compaction.
// Test to make sure compaction doesn't get confused and think s1 can see both
// values, and thus compact out the older value by mistake.
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
auto* transaction =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("key1", "value1_2"));
ASSERT_OK(transaction->Prepare());
auto snapshot1 = db->GetSnapshot();
// Increment sequence number.
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
auto snapshot2 = db->GetSnapshot();
ASSERT_OK(transaction->Commit());
delete transaction;
VerifyKeys({{"key1", "value1_2"}});
VerifyKeys({{"key1", "value1_1"}}, snapshot1);
VerifyKeys({{"key1", "value1_1"}}, snapshot2);
// Add a flush to avoid compaction to fallback to trivial move.
auto callback = [&](void*) {
// Release snapshot1 after CompactionIterator init.
// CompactionIterator need to figure out the earliest snapshot
// that can see key1:value1_2 is kMaxSequenceNumber, not
// snapshot1 or snapshot2.
db->ReleaseSnapshot(snapshot1);
// Add some keys to advance max_evicted_seq.
ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
callback);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Flush(FlushOptions()));
VerifyKeys({{"key1", "value1_2"}});
VerifyKeys({{"key1", "value1_1"}}, snapshot2);
db->ReleaseSnapshot(snapshot2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
// Insert two values, v1 and v2, for a key. Take two snapshots, s1 and s2,
// after committing v2. Release s1 during compaction, right after compaction
// processes v2 and before processes v1. Test to make sure compaction doesn't
// get confused and believe v1 and v2 are visible to different snapshot
// (v1 by s2, v2 by s1) and refuse to compact out v1.
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
SequenceNumber v2_seq = db->GetLatestSequenceNumber();
auto* s1 = db->GetSnapshot();
// Advance sequence number.
ASSERT_OK(db->Put(WriteOptions(), "key2", "dummy"));
auto* s2 = db->GetSnapshot();
int count_value = 0;
auto callback = [&](void* arg) {
auto* ikey = reinterpret_cast<ParsedInternalKey*>(arg);
if (ikey->user_key == "key1") {
count_value++;
if (count_value == 2) {
// Processing v1.
db->ReleaseSnapshot(s1);
// Add some keys to advance max_evicted_seq and update
// old_commit_map.
ASSERT_OK(db->Put(WriteOptions(), "key3", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "key4", "dummy"));
}
}
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:ProcessKV",
callback);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Flush(FlushOptions()));
// value1 should be compact out.
VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
// cleanup
db->ReleaseSnapshot(s2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
// Insert two values, v1 and v2, for a key. Insert another dummy key
// so to evict the commit cache for v2, while v1 is still in commit cache.
// Take two snapshots, s1 and s2. Release s1 during compaction.
// Since commit cache for v2 is evicted, and old_commit_map don't have
// s1 (it is released),
// TODO(myabandeh): how can we be sure that the v2's commit info is evicted
// (and not v1's)? Instead of putting a dummy, we can directly call
// AddCommitted(v2_seq + cache_size, ...) to evict v2's entry from commit cache.
TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // commit cache size = 2
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
// Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
// It also advance max_evicted_seq and can trigger old_commit_map cleanup.
auto add_dummy = [&]() {
auto* txn_dummy =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(txn_dummy->SetName("txn_dummy"));
ASSERT_OK(txn_dummy->Put("dummy", "dummy"));
ASSERT_OK(txn_dummy->Prepare());
ASSERT_OK(txn_dummy->Commit());
delete txn_dummy;
};
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(txn->SetName("txn"));
ASSERT_OK(txn->Put("key1", "value2"));
ASSERT_OK(txn->Prepare());
// TODO(myabandeh): replace it with GetId()?
auto v2_seq = db->GetLatestSequenceNumber();
ASSERT_OK(txn->Commit());
delete txn;
auto* s1 = db->GetSnapshot();
// Dummy key to advance sequence number.
add_dummy();
auto* s2 = db->GetSnapshot();
auto callback = [&](void*) {
db->ReleaseSnapshot(s1);
// Add some dummy entries to trigger s1 being cleanup from old_commit_map.
add_dummy();
add_dummy();
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
callback);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Flush(FlushOptions()));
// value1 should be compact out.
VerifyInternalKeys({{"key1", "value2", v2_seq, kTypeValue}});
db->ReleaseSnapshot(s2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* transaction =
db->BeginTransaction(WriteOptions(), TransactionOptions(), nullptr);
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Delete("key1"));
ASSERT_OK(transaction->Prepare());
SequenceNumber del_seq = db->GetLatestSequenceNumber();
auto snapshot1 = db->GetSnapshot();
// Increment sequence number.
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
auto snapshot2 = db->GetSnapshot();
ASSERT_OK(transaction->Commit());
delete transaction;
VerifyKeys({{"key1", "NOT_FOUND"}});
VerifyKeys({{"key1", "value1"}}, snapshot1);
VerifyKeys({{"key1", "value1"}}, snapshot2);
ASSERT_OK(db->Flush(FlushOptions()));
auto callback = [&](void* compaction) {
// Release snapshot1 after CompactionIterator init.
// CompactionIterator need to double check and find out snapshot2 is now
// the earliest existing snapshot.
if (compaction != nullptr) {
db->ReleaseSnapshot(snapshot1);
// Add some keys to advance max_evicted_seq.
ASSERT_OK(db->Put(WriteOptions(), "key3", "value3"));
ASSERT_OK(db->Put(WriteOptions(), "key4", "value4"));
}
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
callback);
SyncPoint::GetInstance()->EnableProcessing();
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Only verify for key1. Both the put and delete for the key should be kept.
// Since the delete tombstone is not visible to snapshot2, we need to keep
// at least one version of the key, for write-conflict check.
VerifyInternalKeys({{"key1", "", del_seq, kTypeDeletion},
{"key1", "value1", 0, kTypeValue}});
db->ReleaseSnapshot(snapshot2);
SyncPoint::GetInstance()->ClearAllCallBacks();
}
// A more complex test to verify compaction/flush should keep keys visible
// to snapshots.
TEST_P(WritePreparedTransactionTest,
CompactionKeepSnapshotVisibleKeysRandomized) {
constexpr size_t kNumTransactions = 10;
constexpr size_t kNumIterations = 1000;
std::vector<Transaction*> transactions(kNumTransactions, nullptr);
std::vector<size_t> versions(kNumTransactions, 0);
std::unordered_map<std::string, std::string> current_data;
std::vector<const Snapshot*> snapshots;
std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
Random rnd(1103);
options.disable_auto_compactions = true;
ReOpen();
for (size_t i = 0; i < kNumTransactions; i++) {
std::string key = "key" + ToString(i);
std::string value = "value0";
ASSERT_OK(db->Put(WriteOptions(), key, value));
current_data[key] = value;
}
VerifyKeys(current_data);
for (size_t iter = 0; iter < kNumIterations; iter++) {
auto r = rnd.Next() % (kNumTransactions + 1);
if (r < kNumTransactions) {
std::string key = "key" + ToString(r);
if (transactions[r] == nullptr) {
std::string value = "value" + ToString(versions[r] + 1);
auto* txn = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn->SetName("txn" + ToString(r)));
ASSERT_OK(txn->Put(key, value));
ASSERT_OK(txn->Prepare());
transactions[r] = txn;
} else {
std::string value = "value" + ToString(++versions[r]);
ASSERT_OK(transactions[r]->Commit());
delete transactions[r];
transactions[r] = nullptr;
current_data[key] = value;
}
} else {
auto* snapshot = db->GetSnapshot();
VerifyKeys(current_data, snapshot);
snapshots.push_back(snapshot);
snapshot_data.push_back(current_data);
}
VerifyKeys(current_data);
}
// Take a last snapshot to test compaction with uncommitted prepared
// transaction.
snapshots.push_back(db->GetSnapshot());
snapshot_data.push_back(current_data);
assert(snapshots.size() == snapshot_data.size());
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
ASSERT_OK(db->Flush(FlushOptions()));
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
// cleanup
for (size_t i = 0; i < kNumTransactions; i++) {
if (transactions[i] == nullptr) {
continue;
}
ASSERT_OK(transactions[i]->Commit());
delete transactions[i];
}
for (size_t i = 0; i < snapshots.size(); i++) {
db->ReleaseSnapshot(snapshots[i]);
}
}
// Compaction should not apply the optimization to output key with sequence
// number equal to 0 if the key is not visible to earliest snapshot, based on
// commit sequence number.
TEST_P(WritePreparedTransactionTest,
CompactionShouldKeepSequenceForUncommittedKeys) {
options.disable_auto_compactions = true;
ReOpen();
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto* transaction = db->BeginTransaction(WriteOptions());
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("key1", "value1"));
ASSERT_OK(transaction->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
SequenceNumber seq1 = expected_seq;
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
expected_seq++; // one for data
if (options.two_write_queues) {
expected_seq++; // one for commit
}
ASSERT_EQ(expected_seq, db_impl->TEST_GetLastVisibleSequence());
ASSERT_OK(db->Flush(FlushOptions()));
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyKeys({
{"key1", "NOT_FOUND"},
{"key2", "value2"},
});
VerifyInternalKeys({
// "key1" has not been committed. It keeps its sequence number.
{"key1", "value1", seq1, kTypeValue},
// "key2" is committed and output with seq = 0.
{"key2", "value2", 0, kTypeValue},
});
ASSERT_OK(transaction->Commit());
VerifyKeys({
{"key1", "value1"},
{"key2", "value2"},
});
delete transaction;
}
TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
options.disable_auto_compactions = true;
ReOpen();
const Snapshot* snapshot = nullptr;
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* txn = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn->SetName("txn"));
ASSERT_OK(txn->Put("key1", "value2"));
ASSERT_OK(txn->Prepare());
auto callback = [&](void*) {
// Snapshot is taken after compaction start. It should be taken into
// consideration for whether to compact out value1.
snapshot = db->GetSnapshot();
ASSERT_OK(txn->Commit());
delete txn;
};
SyncPoint::GetInstance()->SetCallBack("CompactionIterator:AfterInit",
callback);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db->Flush(FlushOptions()));
ASSERT_NE(nullptr, snapshot);
VerifyKeys({{"key1", "value2"}});
VerifyKeys({{"key1", "value1"}}, snapshot);
db->ReleaseSnapshot(snapshot);
}
TEST_P(WritePreparedTransactionTest, Iterate) {
auto verify_state = [](Iterator* iter, const std::string& key,
const std::string& value) {
ASSERT_TRUE(iter->Valid());
ASSERT_OK(iter->status());
ASSERT_EQ(key, iter->key().ToString());
ASSERT_EQ(value, iter->value().ToString());
};
auto verify_iter = [&](const std::string& expected_val) {
// Get iterator from a concurrent transaction and make sure it has the
// same view as an iterator from the DB.
auto* txn = db->BeginTransaction(WriteOptions());
for (int i = 0; i < 2; i++) {
Iterator* iter = (i == 0)
? db->NewIterator(ReadOptions())
: txn->GetIterator(ReadOptions());
// Seek
iter->Seek("foo");
verify_state(iter, "foo", expected_val);
// Next
iter->Seek("a");
verify_state(iter, "a", "va");
iter->Next();
verify_state(iter, "foo", expected_val);
// SeekForPrev
iter->SeekForPrev("y");
verify_state(iter, "foo", expected_val);
// Prev
iter->SeekForPrev("z");
verify_state(iter, "z", "vz");
iter->Prev();
verify_state(iter, "foo", expected_val);
delete iter;
}
delete txn;
};
ASSERT_OK(db->Put(WriteOptions(), "foo", "v1"));
auto* transaction = db->BeginTransaction(WriteOptions());
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("foo", "v2"));
ASSERT_OK(transaction->Prepare());
VerifyKeys({{"foo", "v1"}});
// dummy keys
ASSERT_OK(db->Put(WriteOptions(), "a", "va"));
ASSERT_OK(db->Put(WriteOptions(), "z", "vz"));
verify_iter("v1");
ASSERT_OK(transaction->Commit());
VerifyKeys({{"foo", "v2"}});
verify_iter("v2");
delete transaction;
}
TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
Iterator* iter = db->NewIterator(ReadOptions());
ASSERT_TRUE(iter->Refresh().IsNotSupported());
delete iter;
}
// Committing an delayed prepared has two non-atomic steps: update commit cache,
// remove seq from delayed_prepared_. The read in IsInSnapshot also involves two
// non-atomic steps of checking these two data structures. This test breaks each
// in the middle to ensure correctness in spite of non-atomic execution.
// Note: This test is limitted to the case where snapshot is larger than the
// max_evicted_seq_.
TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 3; // 8 entries
for (auto split_read : {true, false}) {
std::vector<bool> split_options = {false};
if (split_read) {
// Also test for break before mutex
split_options.push_back(true);
}
for (auto split_before_mutex : split_options) {
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db->GetRootDB());
// Fill up the commit cache
std::string init_value("value1");
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
}
// Prepare a transaction but do not commit it
Transaction* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
ASSERT_OK(txn->Prepare());
// Commit a bunch of entries to advance max evicted seq and make the
// prepared a delayed prepared
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
}
// The snapshot should not see the delayed prepared entry
auto snap = db->GetSnapshot();
if (split_read) {
if (split_before_mutex) {
// split before acquiring prepare_mutex_
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:pause",
"AtomicCommitOfDelayedPrepared:Commit:before"},
{"AtomicCommitOfDelayedPrepared:Commit:after",
"WritePreparedTxnDB::IsInSnapshot:prepared_mutex_:resume"}});
} else {
// split right after reading from the commit cache
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:pause",
"AtomicCommitOfDelayedPrepared:Commit:before"},
{"AtomicCommitOfDelayedPrepared:Commit:after",
"WritePreparedTxnDB::IsInSnapshot:GetCommitEntry:resume"}});
}
} else { // split commit
// split right before removing from delayed_prepared_
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"WritePreparedTxnDB::RemovePrepared:pause",
"AtomicCommitOfDelayedPrepared:Read:before"},
{"AtomicCommitOfDelayedPrepared:Read:after",
"WritePreparedTxnDB::RemovePrepared:resume"}});
}
SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:before");
ASSERT_OK(txn->Commit());
if (split_before_mutex) {
// Do bunch of inserts to evict the commit entry from the cache. This
// would prevent the 2nd look into commit cache under prepare_mutex_
// to see the commit entry.
auto seq = db_impl->TEST_GetLastVisibleSequence();
size_t tries = 0;
while (wp_db->max_evicted_seq_ < seq && tries < 50) {
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
tries++;
};
ASSERT_LT(tries, 50);
}
TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Commit:after");
delete txn;
});
ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:before");
ReadOptions roptions;
roptions.snapshot = snap;
PinnableSlice value;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
ASSERT_OK(s);
// It should not see the commit of delayed prepared
ASSERT_TRUE(value == init_value);
TEST_SYNC_POINT("AtomicCommitOfDelayedPrepared:Read:after");
db->ReleaseSnapshot(snap);
});
read_thread.join();
commit_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
} // for split_before_mutex
} // for split_read
}
// When max evicted seq advances a prepared seq, it involves two updates: i)
// adding prepared seq to delayed_prepared_, ii) updating max_evicted_seq_.
// ::IsInSnapshot also reads these two values in a non-atomic way. This test
// ensures correctness if the update occurs after ::IsInSnapshot reads
// delayed_prepared_empty_ and before it reads max_evicted_seq_.
// Note: this test focuses on read snapshot larger than max_evicted_seq_.
TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 3; // 8 entries
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Fill up the commit cache
std::string init_value("value1");
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
}
// Prepare a transaction but do not commit it
Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
ASSERT_OK(txn->Prepare());
// Create a gap between prepare seq and snapshot seq
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
// The snapshot should not see the delayed prepared entry
auto snap = db->GetSnapshot();
ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
// split right after reading delayed_prepared_empty_
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:pause",
"AtomicUpdateOfDelayedPrepared:before"},
{"AtomicUpdateOfDelayedPrepared:after",
"WritePreparedTxnDB::IsInSnapshot:delayed_prepared_empty_:resume"}});
SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:before");
// Commit a bunch of entries to advance max evicted seq and make the
// prepared a delayed prepared
size_t tries = 0;
while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
tries++;
};
ASSERT_LT(tries, 50);
// This is the case on which the test focuses
ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
TEST_SYNC_POINT("AtomicUpdateOfDelayedPrepared:after");
});
ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
ReadOptions roptions;
roptions.snapshot = snap;
PinnableSlice value;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
ASSERT_OK(s);
// It should not see the uncommitted value of delayed prepared
ASSERT_TRUE(value == init_value);
db->ReleaseSnapshot(snap);
});
read_thread.join();
commit_thread.join();
ASSERT_OK(txn->Commit());
delete txn;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
// Eviction from commit cache and update of max evicted seq are two non-atomic
// steps. Similarly the read of max_evicted_seq_ in ::IsInSnapshot and reading
// from commit cache are two non-atomic steps. This tests if the update occurs
// after reading max_evicted_seq_ and before reading the commit cache.
// Note: the test focuses on snapshot larger than max_evicted_seq_
TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 3; // 8 entries
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Fill up the commit cache
std::string init_value("value1");
std::string last_value("value_final");
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
}
// Do an uncommitted write to prevent min_uncommitted optimization
Transaction* txn1 =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Put(Slice("key0"), last_value));
ASSERT_OK(txn1->Prepare());
// Do a write with prepare to get the prepare seq
Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("key1"), last_value));
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
// Create a gap between commit entry and snapshot seq
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
// The snapshot should see the last commit
auto snap = db->GetSnapshot();
ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
// split right after reading max_evicted_seq_
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:pause",
"NonAtomicUpdateOfMaxEvictedSeq:before"},
{"NonAtomicUpdateOfMaxEvictedSeq:after",
"WritePreparedTxnDB::IsInSnapshot:max_evicted_seq_:resume"}});
SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread commit_thread([&]() {
TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:before");
// Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
size_t tries = 0;
while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
tries++;
};
ASSERT_LT(tries, 50);
// This is the case on which the test focuses
ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
TEST_SYNC_POINT("NonAtomicUpdateOfMaxEvictedSeq:after");
});
ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
ReadOptions roptions;
roptions.snapshot = snap;
PinnableSlice value;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key1", &value);
ASSERT_OK(s);
// It should see the committed value of the evicted entry
ASSERT_TRUE(value == last_value);
db->ReleaseSnapshot(snap);
});
read_thread.join();
commit_thread.join();
delete txn;
txn1->Commit();
delete txn1;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
// Test when we add a prepared seq when the max_evicted_seq_ already goes beyond
// that. The test focuses on a race condition between AddPrepared and
// AdvanceMaxEvictedSeq functions.
TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
if (!options.two_write_queues) {
// This test is only for two write queues
return;
}
const size_t snapshot_cache_bits = 7; // same as default
// 1 entry to advance max after the 2nd commit
const size_t commit_cache_bits = 0;
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
std::string some_value("value_some");
std::string uncommitted_value("value_uncommitted");
// Prepare two uncommitted transactions
Transaction* txn1 =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Put(Slice("key1"), some_value));
ASSERT_OK(txn1->Prepare());
Transaction* txn2 =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn2->SetName("xid2"));
ASSERT_OK(txn2->Put(Slice("key2"), some_value));
ASSERT_OK(txn2->Prepare());
// Start the txn here so the other thread could get its id
Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("key0"), uncommitted_value));
port::Mutex txn_mutex_;
// t1) Insert prepared entry, t2) commit other entries to advance max
// evicted sec and finish checking the existing prepared entries, t1)
// AddPrepared, t2) update max_evicted_seq_
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"AddPreparedCallback::AddPrepared::begin:pause",
"AddPreparedBeforeMax::read_thread:start"},
{"AdvanceMaxEvictedSeq::update_max:pause",
"AddPreparedCallback::AddPrepared::begin:resume"},
{"AddPreparedCallback::AddPrepared::end",
"AdvanceMaxEvictedSeq::update_max:resume"},
});
SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
txn_mutex_.Lock();
ASSERT_OK(txn->Prepare());
txn_mutex_.Unlock();
});
ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
TEST_SYNC_POINT("AddPreparedBeforeMax::read_thread:start");
// Publish seq number with a commit
ASSERT_OK(txn1->Commit());
// Since the commit cache size is one the 2nd commit evict the 1st one and
// invokes AdcanceMaxEvictedSeq
ASSERT_OK(txn2->Commit());
ReadOptions roptions;
PinnableSlice value;
// The snapshot should not see the uncommitted value from write_thread
auto snap = db->GetSnapshot();
ASSERT_LT(wp_db->max_evicted_seq_, snap->GetSequenceNumber());
// This is the scenario that we test for
txn_mutex_.Lock();
ASSERT_GT(wp_db->max_evicted_seq_, txn->GetId());
txn_mutex_.Unlock();
roptions.snapshot = snap;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key0", &value);
ASSERT_TRUE(s.IsNotFound());
db->ReleaseSnapshot(snap);
});
read_thread.join();
write_thread.join();
delete txn1;
delete txn2;
ASSERT_OK(txn->Commit());
delete txn;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
// When an old prepared entry gets committed, there is a gap between the time
// that it is published and when it is cleaned up from old_prepared_. This test
// stresses such cases.
TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
const size_t snapshot_cache_bits = 7; // same as default
for (const size_t commit_cache_bits : {0, 2, 3}) {
for (const size_t sub_batch_cnt : {1, 2, 3}) {
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
std::atomic<const Snapshot*> snap = {nullptr};
std::atomic<SequenceNumber> exp_prepare = {0};
ROCKSDB_NAMESPACE::port::Thread callback_thread;
// Value is synchronized via snap
PinnableSlice value;
// Take a snapshot after publish and before RemovePrepared:Start
auto snap_callback = [&]() {
ASSERT_EQ(nullptr, snap.load());
snap.store(db->GetSnapshot());
ReadOptions roptions;
roptions.snapshot = snap.load();
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key2", &value);
ASSERT_OK(s);
};
auto callback = [&](void* param) {
SequenceNumber prep_seq = *((SequenceNumber*)param);
if (prep_seq == exp_prepare.load()) { // only for write_thread
// We need to spawn a thread to avoid deadlock since getting a
// snpashot might end up calling AdvanceSeqByOne which needs joining
// the write queue.
callback_thread = ROCKSDB_NAMESPACE::port::Thread(snap_callback);
TEST_SYNC_POINT("callback:end");
}
};
// Wait for the first snapshot be taken in GetSnapshotInternal. Although
// it might be updated before GetSnapshotInternal finishes but this should
// cover most of the cases.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::GetSnapshotInternal:first", "callback:end"},
});
SyncPoint::GetInstance()->SetCallBack("RemovePrepared:Start", callback);
SyncPoint::GetInstance()->EnableProcessing();
// Thread to cause frequent evictions
ROCKSDB_NAMESPACE::port::Thread eviction_thread([&]() {
// Too many txns might cause commit_seq - prepare_seq in another thread
// to go beyond DELTA_UPPERBOUND
for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
db->Put(WriteOptions(), Slice("key1"), Slice("value1"));
}
});
ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
Transaction* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
std::string val_str = "value" + ToString(i);
for (size_t b = 0; b < sub_batch_cnt; b++) {
ASSERT_OK(txn->Put(Slice("key2"), val_str));
}
ASSERT_OK(txn->Prepare());
// Let an eviction to kick in
std::this_thread::yield();
exp_prepare.store(txn->GetId());
ASSERT_OK(txn->Commit());
delete txn;
// Wait for the snapshot taking that is triggered by
// RemovePrepared:Start callback
callback_thread.join();
// Read with the snapshot taken before delayed_prepared_ cleanup
ReadOptions roptions;
roptions.snapshot = snap.load();
ASSERT_NE(nullptr, roptions.snapshot);
PinnableSlice value2;
auto s =
db->Get(roptions, db->DefaultColumnFamily(), "key2", &value2);
ASSERT_OK(s);
// It should see its own write
ASSERT_TRUE(val_str == value2);
// The value read by snapshot should not change
ASSERT_STREQ(value2.ToString().c_str(), value.ToString().c_str());
db->ReleaseSnapshot(roptions.snapshot);
snap.store(nullptr);
}
});
write_thread.join();
eviction_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
}
}
// Test that updating the commit map will not affect the existing snapshots
TEST_P(WritePreparedTransactionTest, AtomicCommit) {
for (bool skip_prepare : {true, false}) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
{"WritePreparedTxnDB::AddCommitted:start",
"AtomicCommit::GetSnapshot:start"},
{"AtomicCommit::Get:end",
"WritePreparedTxnDB::AddCommitted:start:pause"},
{"WritePreparedTxnDB::AddCommitted:end", "AtomicCommit::Get2:start"},
{"AtomicCommit::Get2:end",
"WritePreparedTxnDB::AddCommitted:end:pause:"},
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
if (skip_prepare) {
db->Put(WriteOptions(), Slice("key"), Slice("value"));
} else {
Transaction* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions());
ASSERT_OK(txn->SetName("xid"));
ASSERT_OK(txn->Put(Slice("key"), Slice("value")));
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
delete txn;
}
});
ROCKSDB_NAMESPACE::port::Thread read_thread([&]() {
ReadOptions roptions;
TEST_SYNC_POINT("AtomicCommit::GetSnapshot:start");
roptions.snapshot = db->GetSnapshot();
PinnableSlice val;
auto s = db->Get(roptions, db->DefaultColumnFamily(), "key", &val);
TEST_SYNC_POINT("AtomicCommit::Get:end");
TEST_SYNC_POINT("AtomicCommit::Get2:start");
ASSERT_SAME(roptions, db, s, val, "key");
TEST_SYNC_POINT("AtomicCommit::Get2:end");
db->ReleaseSnapshot(roptions.snapshot);
});
read_thread.join();
write_thread.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
}
// Test that we can change write policy from WriteCommitted to WritePrepared
// after a clean shutdown (which would empty the WAL)
TEST_P(WritePreparedTransactionTest, WP_WC_DBBackwardCompatibility) {
bool empty_wal = true;
CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, empty_wal);
}
// Test that we fail fast if WAL is not emptied between changing the write
// policy from WriteCommitted to WritePrepared
TEST_P(WritePreparedTransactionTest, WP_WC_WALBackwardIncompatibility) {
bool empty_wal = true;
CrossCompatibilityTest(WRITE_COMMITTED, WRITE_PREPARED, !empty_wal);
}
// Test that we can change write policy from WritePrepare back to WriteCommitted
// after a clean shutdown (which would empty the WAL)
TEST_P(WritePreparedTransactionTest, WC_WP_ForwardCompatibility) {
bool empty_wal = true;
CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, empty_wal);
}
// Test that we fail fast if WAL is not emptied between changing the write
// policy from WriteCommitted to WritePrepared
TEST_P(WritePreparedTransactionTest, WC_WP_WALForwardIncompatibility) {
bool empty_wal = true;
CrossCompatibilityTest(WRITE_PREPARED, WRITE_COMMITTED, !empty_wal);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr,
"SKIPPED as Transactions are not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // ROCKSDB_LITE