Advance max evicted seq in coarser granularity

Summary:
This patch advances the max_evicted_seq_ is larger granularities to reduce the overhead of updating the relevant data structures.

It also refactor the related code and adds testing to that. As part of this patch some of the TODOs for removing usage of non-static const members are also addressed.
Closes https://github.com/facebook/rocksdb/pull/2844

Differential Revision: D5772928

Pulled By: maysamyabandeh

fbshipit-source-id: f4fcc2948be69c034f10812cf922ce5ab82ef98c
This commit is contained in:
Maysam Yabandeh 2017-09-08 14:32:30 -07:00 committed by Facebook Github Bot
parent dcd36a6aee
commit fce6c892ab
3 changed files with 154 additions and 87 deletions

View file

@ -604,8 +604,8 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
if (to_be_evicted) { if (to_be_evicted) {
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
if (prev_max < evicted.commit_seq) { if (prev_max < evicted.commit_seq) {
// TODO(myabandeh) inc max in larger steps to avoid frequent updates // Inc max in larger steps to avoid frequent updates
auto max_evicted_seq = evicted.commit_seq; auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED;
AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
} }
// After each eviction from commit cache, check if the commit entry should // After each eviction from commit cache, check if the commit entry should
@ -683,30 +683,22 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
} }
} }
// With each change to max_evicted_seq_ fetch the live snapshots behind it // With each change to max_evicted_seq_ fetch the live snapshots behind it.
SequenceNumber curr_seq; // We use max as the version of snapshots to identify how fresh are the
// snapshot list. This works because the snapshots are between 0 and
// max, so the larger the max, the more complete they are.
SequenceNumber new_snapshots_version = new_max;
std::vector<SequenceNumber> snapshots; std::vector<SequenceNumber> snapshots;
bool update_snapshots = false; bool update_snapshots = false;
{ if (new_snapshots_version > snapshots_version_) {
InstrumentedMutex(db_impl_->mutex()); // This is to avoid updating the snapshots_ if it already updated
// We use this to identify how fresh are the snapshot list. Since this // with a more recent vesion by a concrrent thread
// is done atomically with obtaining the snapshot list, the one with update_snapshots = true;
// the larger seq is more fresh. If the seq is equal the full snapshot // We only care about snapshots lower then max
// list could be different since taking snapshots does not increase snapshots = GetSnapshotListFromDB(new_max);
// the db seq. However since we only care about snapshots before the
// new max, such recent snapshots would not be included the in the
// list anyway.
curr_seq = db_impl_->GetLatestSequenceNumber();
if (curr_seq > snapshots_version_) {
// This is to avoid updating the snapshots_ if it already updated
// with a more recent vesion by a concrrent thread
update_snapshots = true;
// We only care about snapshots lower then max
snapshots = db_impl_->snapshots().GetAll(nullptr, new_max);
}
} }
if (update_snapshots) { if (update_snapshots) {
UpdateSnapshots(snapshots, curr_seq); UpdateSnapshots(snapshots, new_snapshots_version);
} }
// TODO(myabandeh): check if it worked with relaxed ordering // TODO(myabandeh): check if it worked with relaxed ordering
while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak( while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak(
@ -715,10 +707,11 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max,
}; };
} }
// 10m entry, 80MB size const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast<size_t>(1 << 21); SequenceNumber max) {
size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = InstrumentedMutex(db_impl_->mutex());
static_cast<size_t>(1 << 7); return db_impl_->snapshots().GetAll(nullptr, max);
}
void WritePreparedTxnDB::UpdateSnapshots( void WritePreparedTxnDB::UpdateSnapshots(
const std::vector<SequenceNumber>& snapshots, const std::vector<SequenceNumber>& snapshots,

View file

@ -123,6 +123,7 @@ class PessimisticTransactionDB : public TransactionDB {
private: private:
friend class WritePreparedTxnDB; friend class WritePreparedTxnDB;
friend class WritePreparedTxnDBMock;
const TransactionDBOptions txn_db_options_; const TransactionDBOptions txn_db_options_;
TransactionLockMgr lock_mgr_; TransactionLockMgr lock_mgr_;
@ -166,19 +167,23 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB {
// mechanisms to tell such data apart from committed data. // mechanisms to tell such data apart from committed data.
class WritePreparedTxnDB : public PessimisticTransactionDB { class WritePreparedTxnDB : public PessimisticTransactionDB {
public: public:
explicit WritePreparedTxnDB(DB* db, explicit WritePreparedTxnDB(
const TransactionDBOptions& txn_db_options) DB* db, const TransactionDBOptions& txn_db_options,
size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE,
size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE)
: PessimisticTransactionDB(db, txn_db_options), : PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE), SNAPSHOT_CACHE_SIZE(snapshot_cache_size),
COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { COMMIT_CACHE_SIZE(commit_cache_size) {
init(txn_db_options); init(txn_db_options);
} }
explicit WritePreparedTxnDB(StackableDB* db, explicit WritePreparedTxnDB(
const TransactionDBOptions& txn_db_options) StackableDB* db, const TransactionDBOptions& txn_db_options,
size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE,
size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE)
: PessimisticTransactionDB(db, txn_db_options), : PessimisticTransactionDB(db, txn_db_options),
SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE), SNAPSHOT_CACHE_SIZE(snapshot_cache_size),
COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { COMMIT_CACHE_SIZE(commit_cache_size) {
init(txn_db_options); init(txn_db_options);
} }
@ -204,8 +209,14 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test; friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test;
friend class WritePreparedTransactionTest; friend class WritePreparedTransactionTest;
friend class PreparedHeap_BasicsTest_Test; friend class PreparedHeap_BasicsTest_Test;
friend class WritePreparedTxnDBMock;
friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test;
void init(const TransactionDBOptions& /* unused */) { void init(const TransactionDBOptions& /* unused */) {
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps
// around.
INC_STEP_FOR_MAX_EVICTED =
std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast<size_t>(1));
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>( snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {}); new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ = commit_cache_ =
@ -275,6 +286,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// largetst new_max value. // largetst new_max value.
void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max); void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max);
virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
SequenceNumber max);
// Update the list of snapshots corresponding to the soon-to-be-updated // Update the list of snapshots corresponding to the soon-to-be-updated
// max_eviceted_seq_. Thread-safety: this function can be called concurrently. // max_eviceted_seq_. Thread-safety: this function can be called concurrently.
// The concurrent invocations of this function is equivalent to a serial // The concurrent invocations of this function is equivalent to a serial
@ -310,8 +324,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// with snapshots_mutex_ and concurrent reads are safe due to std::atomic for // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for
// each entry. In x86_64 architecture such reads are compiled to simple read // each entry. In x86_64 architecture such reads are compiled to simple read
// instructions. 128 entries // instructions. 128 entries
// TODO(myabandeh): avoid non-const static variables static const size_t DEF_SNAPSHOT_CACHE_SIZE = static_cast<size_t>(1 << 7);
static size_t DEF_SNAPSHOT_CACHE_SIZE;
const size_t SNAPSHOT_CACHE_SIZE; const size_t SNAPSHOT_CACHE_SIZE;
unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_; unique_ptr<std::atomic<SequenceNumber>[]> snapshot_cache_;
// 2nd list for storing snapshots. The list sorted in ascending order. // 2nd list for storing snapshots. The list sorted in ascending order.
@ -324,14 +337,19 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// A heap of prepared transactions. Thread-safety is provided with // A heap of prepared transactions. Thread-safety is provided with
// prepared_mutex_. // prepared_mutex_.
PreparedHeap prepared_txns_; PreparedHeap prepared_txns_;
// TODO(myabandeh): avoid non-const static variables // 10m entry, 80MB size
static size_t DEF_COMMIT_CACHE_SIZE; static const size_t DEF_COMMIT_CACHE_SIZE = static_cast<size_t>(1 << 21);
const size_t COMMIT_CACHE_SIZE; const size_t COMMIT_CACHE_SIZE;
// commit_cache_ must be initialized to zero to tell apart an empty index from // commit_cache_ must be initialized to zero to tell apart an empty index from
// a filled one. Thread-safety is provided with commit_cache_mutex_. // a filled one. Thread-safety is provided with commit_cache_mutex_.
unique_ptr<CommitEntry[]> commit_cache_; unique_ptr<CommitEntry[]> commit_cache_;
// The largest evicted *commit* sequence number from the commit_cache_ // The largest evicted *commit* sequence number from the commit_cache_
std::atomic<uint64_t> max_evicted_seq_ = {}; std::atomic<uint64_t> max_evicted_seq_ = {};
// Advance max_evicted_seq_ by this value each time it needs an update. The
// larger the value, the less frequent advances we would have. We do not want
// it to be too large either as it would cause stalls by doing too much
// maintenance work under the lock.
size_t INC_STEP_FOR_MAX_EVICTED = 1;
// A map of the evicted entries from commit_cache_ that has to be kept around // A map of the evicted entries from commit_cache_ that has to be kept around
// to service the old snapshots. This is expected to be empty normally. // to service the old snapshots. This is expected to be empty normally.
// Thread-safety is provided with old_commit_map_mutex_. // Thread-safety is provided with old_commit_map_mutex_.

View file

@ -106,6 +106,32 @@ TEST(PreparedHeap, BasicsTest) {
ASSERT_TRUE(heap.empty()); ASSERT_TRUE(heap.empty());
} }
class WritePreparedTxnDBMock : public WritePreparedTxnDB {
public:
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt)
: WritePreparedTxnDB(db_impl, opt) {}
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
size_t snapshot_cache_size)
: WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {}
WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt,
size_t snapshot_cache_size, size_t commit_cache_size)
: WritePreparedTxnDB(db_impl, opt, snapshot_cache_size,
commit_cache_size) {}
void SetDBSnapshots(const std::vector<SequenceNumber>& snapshots) {
snapshots_ = snapshots;
}
void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); }
protected:
virtual const std::vector<SequenceNumber> GetSnapshotListFromDB(
SequenceNumber /* unused */) override {
return snapshots_;
}
private:
std::vector<SequenceNumber> snapshots_;
};
class WritePreparedTransactionTest : public TransactionTest { class WritePreparedTransactionTest : public TransactionTest {
protected: protected:
// If expect_update is set, check if it actually updated old_commit_map_. If // If expect_update is set, check if it actually updated old_commit_map_. If
@ -309,12 +335,9 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) {
TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) { TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) {
std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l, std::vector<SequenceNumber> snapshots = {100l, 200l, 300l, 400l,
500l, 600l, 700l}; 500l, 600l, 700l};
// will take effect after ReOpen DBImpl* mock_db = new DBImpl(options, dbname);
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = snapshots.size() / 2; std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
ReOpen(); // to restart the db mock_db, txn_db_options, snapshots.size() / 2));
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
SequenceNumber version = 1000l; SequenceNumber version = 1000l;
ASSERT_EQ(0, wp_db->snapshots_total_); ASSERT_EQ(0, wp_db->snapshots_total_);
wp_db->UpdateSnapshots(snapshots, version); wp_db->UpdateSnapshots(snapshots, version);
@ -353,16 +376,12 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) {
60l, 70l, 80l, 90l, 100l}; 60l, 70l, 80l, 90l, 100l};
SequenceNumber version = 1000l; SequenceNumber version = 1000l;
// Choose the cache size so that the new snapshot list could replace all the // Choose the cache size so that the new snapshot list could replace all the
// existing items in the cache and also have some overflow Will take effect // existing items in the cache and also have some overflow.
// after ReOpen DBImpl* mock_db = new DBImpl(options, dbname);
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = (snapshots.size() - 2) / 2; std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
ReOpen(); // to restart the db mock_db, txn_db_options, (snapshots.size() - 2) / 2));
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
// Add up to 2 items that do not fit into the cache // Add up to 2 items that do not fit into the cache
for (size_t old_size = 1; for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + 2;
old_size <= WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE + 2;
old_size++) { old_size++) {
const std::vector<SequenceNumber> old_snapshots( const std::vector<SequenceNumber> old_snapshots(
snapshots.begin(), snapshots.begin() + old_size); snapshots.begin(), snapshots.begin() + old_size);
@ -397,21 +416,17 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) {
// The critical part is when iterating the snapshot cache. Afterwards, // The critical part is when iterating the snapshot cache. Afterwards,
// we are operating under the lock // we are operating under the lock
size_t a_range = size_t a_range =
std::min(old_snapshots.size(), std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) +
1;
size_t b_range = size_t b_range =
std::min(new_snapshots.size(), std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1;
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) +
1;
// Break each thread at two points // Break each thread at two points
for (size_t a1 = 1; a1 <= a_range; a1++) { for (size_t a1 = 1; a1 <= a_range; a1++) {
for (size_t a2 = a1 + 1; a2 <= a_range; a2++) { for (size_t a2 = a1 + 1; a2 <= a_range; a2++) {
for (size_t b1 = 1; b1 <= b_range; b1++) { for (size_t b1 = 1; b1 <= b_range; b1++) {
for (size_t b2 = b1 + 1; b2 <= b_range; b2++) { for (size_t b2 = b1 + 1; b2 <= b_range; b2++) {
SnapshotConcurrentAccessTestInternal(wp_db, old_snapshots, SnapshotConcurrentAccessTestInternal(
new_snapshots, entry, wp_db.get(), old_snapshots, new_snapshots, entry, version,
version, a1, a2, b1, b2); a1, a2, b1, b2);
} }
} }
} }
@ -423,16 +438,73 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) {
} }
#endif #endif
// This test clarifies the contract of AdvanceMaxEvictedSeq method
TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) {
DBImpl* mock_db = new DBImpl(options, dbname);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
new WritePreparedTxnDBMock(mock_db, txn_db_options));
// 1. Set the initial values for max, prepared, and snapshots
SequenceNumber zero_max = 0l;
// Set the initial list of prepared txns
const std::vector<SequenceNumber> initial_prepared = {10, 30, 50, 100,
150, 200, 250};
for (auto p : initial_prepared) {
wp_db->AddPrepared(p);
}
// This updates the max value and also set old prepared
SequenceNumber init_max = 100;
wp_db->AdvanceMaxEvictedSeq(zero_max, init_max);
const std::vector<SequenceNumber> initial_snapshots = {20, 40};
wp_db->SetDBSnapshots(initial_snapshots);
// This will update the internal cache of snapshots from the DB
wp_db->UpdateSnapshots(initial_snapshots, init_max);
// 2. Invoke AdvanceMaxEvictedSeq
const std::vector<SequenceNumber> latest_snapshots = {20, 110, 220, 300};
wp_db->SetDBSnapshots(latest_snapshots);
SequenceNumber new_max = 200;
wp_db->AdvanceMaxEvictedSeq(init_max, new_max);
// 3. Verify that the state matches with AdvanceMaxEvictedSeq contract
// a. max should be updated to new_max
ASSERT_EQ(wp_db->max_evicted_seq_, new_max);
// b. delayed prepared should contain every txn <= max and prepared should
// only contian txns > max
auto it = initial_prepared.begin();
for (; it != initial_prepared.end() && *it <= new_max; it++) {
ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it));
}
ASSERT_TRUE(wp_db->delayed_prepared_.empty());
for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty();
it++, wp_db->prepared_txns_.pop()) {
ASSERT_EQ(*it, wp_db->prepared_txns_.top());
}
ASSERT_TRUE(it == initial_prepared.end());
ASSERT_TRUE(wp_db->prepared_txns_.empty());
// c. snapshots should contain everything below new_max
auto sit = latest_snapshots.begin();
for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max &&
i < wp_db->snapshots_total_;
sit++, i++) {
ASSERT_TRUE(i < wp_db->snapshots_total_);
// This test is in small scale and the list of snapshots are assumed to be
// within the cache size limit. This is just a safety check to double check
// that assumption.
ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE);
ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]);
}
}
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of // Test WritePreparedTxnDB's IsInSnapshot against different ordering of
// snapshot, max_committed_seq_, prepared, and commit entries. // snapshot, max_committed_seq_, prepared, and commit entries.
TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
WriteOptions wo; WriteOptions wo;
// Use small commit cache to trigger lots of eviction and fast advance of // Use small commit cache to trigger lots of eviction and fast advance of
// max_evicted_seq_ // max_evicted_seq_
// will take effect after ReOpen const size_t commit_cache_size = 8;
WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8;
// Same for snapshot cache size // Same for snapshot cache size
WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5; const size_t snapshot_cache_size = 5;
// Take some preliminary snapshots first. This is to stress the data structure // Take some preliminary snapshots first. This is to stress the data structure
// that holds the old snapshots as it will be designed to be efficient when // that holds the old snapshots as it will be designed to be efficient when
@ -452,7 +524,6 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
uint64_t cur_txn = 0; uint64_t cur_txn = 0;
// Number of snapshots taken so far // Number of snapshots taken so far
int num_snapshots = 0; int num_snapshots = 0;
std::vector<const Snapshot*> to_be_released;
// Number of gaps applied so far // Number of gaps applied so far
int gap_cnt = 0; int gap_cnt = 0;
// The final snapshot that we will inspect // The final snapshot that we will inspect
@ -465,30 +536,23 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
// We keep the list of txns comitted before we take the last snaphot. // We keep the list of txns comitted before we take the last snaphot.
// These should be the only seq numbers that will be found in the snapshot // These should be the only seq numbers that will be found in the snapshot
std::set<uint64_t> committed_before; std::set<uint64_t> committed_before;
ReOpen(); // to restart the db DBImpl* mock_db = new DBImpl(options, dbname);
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db); std::unique_ptr<WritePreparedTxnDBMock> wp_db(new WritePreparedTxnDBMock(
assert(wp_db); mock_db, txn_db_options, snapshot_cache_size, commit_cache_size));
assert(wp_db->db_impl_);
// We continue until max advances a bit beyond the snapshot. // We continue until max advances a bit beyond the snapshot.
while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) {
// do prepare for a transaction // do prepare for a transaction
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++; seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
wp_db->AddPrepared(seq); wp_db->AddPrepared(seq);
prepared.insert(seq); prepared.insert(seq);
// If cur_txn is not started, do prepare for it. // If cur_txn is not started, do prepare for it.
if (!cur_txn) { if (!cur_txn) {
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++; seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
cur_txn = seq; cur_txn = seq;
wp_db->AddPrepared(cur_txn); wp_db->AddPrepared(cur_txn);
} else { // else commit it } else { // else commit it
wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq
seq++; seq++;
ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq);
wp_db->AddCommitted(cur_txn, seq); wp_db->AddCommitted(cur_txn, seq);
if (!snapshot) { if (!snapshot) {
committed_before.insert(cur_txn); committed_before.insert(cur_txn);
@ -498,20 +562,15 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
if (num_snapshots < max_snapshots - 1) { if (num_snapshots < max_snapshots - 1) {
// Take preliminary snapshots // Take preliminary snapshots
auto tmp_snapshot = db->GetSnapshot(); wp_db->TakeSnapshot(seq);
to_be_released.push_back(tmp_snapshot);
num_snapshots++; num_snapshots++;
} else if (gap_cnt < max_gap) { } else if (gap_cnt < max_gap) {
// Wait for some gap before taking the final snapshot // Wait for some gap before taking the final snapshot
gap_cnt++; gap_cnt++;
} else if (!snapshot) { } else if (!snapshot) {
// Take the final snapshot if it is not already taken // Take the final snapshot if it is not already taken
auto tmp_snapshot = db->GetSnapshot(); snapshot = seq;
to_be_released.push_back(tmp_snapshot); wp_db->TakeSnapshot(snapshot);
snapshot = tmp_snapshot->GetSequenceNumber();
// We increase the db seq artificailly by a dummy Put. Check that this
// technique is effective and db seq is that same as ours.
ASSERT_EQ(snapshot, seq);
num_snapshots++; num_snapshots++;
} }
@ -548,9 +607,6 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
} }
ASSERT_TRUE(wp_db->delayed_prepared_.empty()); ASSERT_TRUE(wp_db->delayed_prepared_.empty());
ASSERT_TRUE(wp_db->prepared_txns_.empty()); ASSERT_TRUE(wp_db->prepared_txns_.empty());
for (auto s : to_be_released) {
db->ReleaseSnapshot(s);
}
} }
} }
} }