diff --git a/utilities/transactions/pessimistic_transaction_db.cc b/utilities/transactions/pessimistic_transaction_db.cc index f98d11d084..8ad7f9540d 100644 --- a/utilities/transactions/pessimistic_transaction_db.cc +++ b/utilities/transactions/pessimistic_transaction_db.cc @@ -604,8 +604,8 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, if (to_be_evicted) { auto prev_max = max_evicted_seq_.load(std::memory_order_acquire); if (prev_max < evicted.commit_seq) { - // TODO(myabandeh) inc max in larger steps to avoid frequent updates - auto max_evicted_seq = evicted.commit_seq; + // Inc max in larger steps to avoid frequent updates + auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED; AdvanceMaxEvictedSeq(prev_max, max_evicted_seq); } // After each eviction from commit cache, check if the commit entry should @@ -683,30 +683,22 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, } } - // With each change to max_evicted_seq_ fetch the live snapshots behind it - SequenceNumber curr_seq; + // With each change to max_evicted_seq_ fetch the live snapshots behind it. + // We use max as the version of snapshots to identify how fresh are the + // snapshot list. This works because the snapshots are between 0 and + // max, so the larger the max, the more complete they are. + SequenceNumber new_snapshots_version = new_max; std::vector snapshots; bool update_snapshots = false; - { - InstrumentedMutex(db_impl_->mutex()); - // We use this to identify how fresh are the snapshot list. Since this - // is done atomically with obtaining the snapshot list, the one with - // the larger seq is more fresh. If the seq is equal the full snapshot - // list could be different since taking snapshots does not increase - // the db seq. However since we only care about snapshots before the - // new max, such recent snapshots would not be included the in the - // list anyway. - curr_seq = db_impl_->GetLatestSequenceNumber(); - if (curr_seq > snapshots_version_) { - // This is to avoid updating the snapshots_ if it already updated - // with a more recent vesion by a concrrent thread - update_snapshots = true; - // We only care about snapshots lower then max - snapshots = db_impl_->snapshots().GetAll(nullptr, new_max); - } + if (new_snapshots_version > snapshots_version_) { + // This is to avoid updating the snapshots_ if it already updated + // with a more recent vesion by a concrrent thread + update_snapshots = true; + // We only care about snapshots lower then max + snapshots = GetSnapshotListFromDB(new_max); } if (update_snapshots) { - UpdateSnapshots(snapshots, curr_seq); + UpdateSnapshots(snapshots, new_snapshots_version); } // TODO(myabandeh): check if it worked with relaxed ordering while (prev_max < new_max && !max_evicted_seq_.compare_exchange_weak( @@ -715,10 +707,11 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(SequenceNumber& prev_max, }; } -// 10m entry, 80MB size -size_t WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = static_cast(1 << 21); -size_t WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = - static_cast(1 << 7); +const std::vector WritePreparedTxnDB::GetSnapshotListFromDB( + SequenceNumber max) { + InstrumentedMutex(db_impl_->mutex()); + return db_impl_->snapshots().GetAll(nullptr, max); +} void WritePreparedTxnDB::UpdateSnapshots( const std::vector& snapshots, diff --git a/utilities/transactions/pessimistic_transaction_db.h b/utilities/transactions/pessimistic_transaction_db.h index e9ea8b1fff..23ecdea297 100644 --- a/utilities/transactions/pessimistic_transaction_db.h +++ b/utilities/transactions/pessimistic_transaction_db.h @@ -123,6 +123,7 @@ class PessimisticTransactionDB : public TransactionDB { private: friend class WritePreparedTxnDB; + friend class WritePreparedTxnDBMock; const TransactionDBOptions txn_db_options_; TransactionLockMgr lock_mgr_; @@ -166,19 +167,23 @@ class WriteCommittedTxnDB : public PessimisticTransactionDB { // mechanisms to tell such data apart from committed data. class WritePreparedTxnDB : public PessimisticTransactionDB { public: - explicit WritePreparedTxnDB(DB* db, - const TransactionDBOptions& txn_db_options) + explicit WritePreparedTxnDB( + DB* db, const TransactionDBOptions& txn_db_options, + size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE, + size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE) : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE), - COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { + SNAPSHOT_CACHE_SIZE(snapshot_cache_size), + COMMIT_CACHE_SIZE(commit_cache_size) { init(txn_db_options); } - explicit WritePreparedTxnDB(StackableDB* db, - const TransactionDBOptions& txn_db_options) + explicit WritePreparedTxnDB( + StackableDB* db, const TransactionDBOptions& txn_db_options, + size_t snapshot_cache_size = DEF_SNAPSHOT_CACHE_SIZE, + size_t commit_cache_size = DEF_COMMIT_CACHE_SIZE) : PessimisticTransactionDB(db, txn_db_options), - SNAPSHOT_CACHE_SIZE(DEF_SNAPSHOT_CACHE_SIZE), - COMMIT_CACHE_SIZE(DEF_COMMIT_CACHE_SIZE) { + SNAPSHOT_CACHE_SIZE(snapshot_cache_size), + COMMIT_CACHE_SIZE(commit_cache_size) { init(txn_db_options); } @@ -204,8 +209,14 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { friend class WritePreparedTransactionTest_SnapshotConcurrentAccessTest_Test; friend class WritePreparedTransactionTest; friend class PreparedHeap_BasicsTest_Test; + friend class WritePreparedTxnDBMock; + friend class WritePreparedTransactionTest_AdvanceMaxEvictedSeqBasicTest_Test; void init(const TransactionDBOptions& /* unused */) { + // Adcance max_evicted_seq_ no more than 100 times before the cache wraps + // around. + INC_STEP_FOR_MAX_EVICTED = + std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast(1)); snapshot_cache_ = unique_ptr[]>( new std::atomic[SNAPSHOT_CACHE_SIZE] {}); commit_cache_ = @@ -275,6 +286,9 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // largetst new_max value. void AdvanceMaxEvictedSeq(SequenceNumber& prev_max, SequenceNumber& new_max); + virtual const std::vector GetSnapshotListFromDB( + SequenceNumber max); + // Update the list of snapshots corresponding to the soon-to-be-updated // max_eviceted_seq_. Thread-safety: this function can be called concurrently. // The concurrent invocations of this function is equivalent to a serial @@ -310,8 +324,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // with snapshots_mutex_ and concurrent reads are safe due to std::atomic for // each entry. In x86_64 architecture such reads are compiled to simple read // instructions. 128 entries - // TODO(myabandeh): avoid non-const static variables - static size_t DEF_SNAPSHOT_CACHE_SIZE; + static const size_t DEF_SNAPSHOT_CACHE_SIZE = static_cast(1 << 7); const size_t SNAPSHOT_CACHE_SIZE; unique_ptr[]> snapshot_cache_; // 2nd list for storing snapshots. The list sorted in ascending order. @@ -324,14 +337,19 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // A heap of prepared transactions. Thread-safety is provided with // prepared_mutex_. PreparedHeap prepared_txns_; - // TODO(myabandeh): avoid non-const static variables - static size_t DEF_COMMIT_CACHE_SIZE; + // 10m entry, 80MB size + static const size_t DEF_COMMIT_CACHE_SIZE = static_cast(1 << 21); const size_t COMMIT_CACHE_SIZE; // commit_cache_ must be initialized to zero to tell apart an empty index from // a filled one. Thread-safety is provided with commit_cache_mutex_. unique_ptr commit_cache_; // The largest evicted *commit* sequence number from the commit_cache_ std::atomic max_evicted_seq_ = {}; + // Advance max_evicted_seq_ by this value each time it needs an update. The + // larger the value, the less frequent advances we would have. We do not want + // it to be too large either as it would cause stalls by doing too much + // maintenance work under the lock. + size_t INC_STEP_FOR_MAX_EVICTED = 1; // A map of the evicted entries from commit_cache_ that has to be kept around // to service the old snapshots. This is expected to be empty normally. // Thread-safety is provided with old_commit_map_mutex_. diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index ffb4a7bc54..8abae6fa80 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -106,6 +106,32 @@ TEST(PreparedHeap, BasicsTest) { ASSERT_TRUE(heap.empty()); } +class WritePreparedTxnDBMock : public WritePreparedTxnDB { + public: + WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt) + : WritePreparedTxnDB(db_impl, opt) {} + WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt, + size_t snapshot_cache_size) + : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size) {} + WritePreparedTxnDBMock(DBImpl* db_impl, TransactionDBOptions& opt, + size_t snapshot_cache_size, size_t commit_cache_size) + : WritePreparedTxnDB(db_impl, opt, snapshot_cache_size, + commit_cache_size) {} + void SetDBSnapshots(const std::vector& snapshots) { + snapshots_ = snapshots; + } + void TakeSnapshot(SequenceNumber seq) { snapshots_.push_back(seq); } + + protected: + virtual const std::vector GetSnapshotListFromDB( + SequenceNumber /* unused */) override { + return snapshots_; + } + + private: + std::vector snapshots_; +}; + class WritePreparedTransactionTest : public TransactionTest { protected: // If expect_update is set, check if it actually updated old_commit_map_. If @@ -309,12 +335,9 @@ TEST_P(WritePreparedTransactionTest, MaybeUpdateOldCommitMap) { TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshotsTest) { std::vector snapshots = {100l, 200l, 300l, 400l, 500l, 600l, 700l}; - // will take effect after ReOpen - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = snapshots.size() / 2; - ReOpen(); // to restart the db - WritePreparedTxnDB* wp_db = dynamic_cast(db); - assert(wp_db); - assert(wp_db->db_impl_); + DBImpl* mock_db = new DBImpl(options, dbname); + std::unique_ptr wp_db(new WritePreparedTxnDBMock( + mock_db, txn_db_options, snapshots.size() / 2)); SequenceNumber version = 1000l; ASSERT_EQ(0, wp_db->snapshots_total_); wp_db->UpdateSnapshots(snapshots, version); @@ -353,16 +376,12 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { 60l, 70l, 80l, 90l, 100l}; SequenceNumber version = 1000l; // Choose the cache size so that the new snapshot list could replace all the - // existing items in the cache and also have some overflow Will take effect - // after ReOpen - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = (snapshots.size() - 2) / 2; - ReOpen(); // to restart the db - WritePreparedTxnDB* wp_db = dynamic_cast(db); - assert(wp_db); - assert(wp_db->db_impl_); + // existing items in the cache and also have some overflow. + DBImpl* mock_db = new DBImpl(options, dbname); + std::unique_ptr wp_db(new WritePreparedTxnDBMock( + mock_db, txn_db_options, (snapshots.size() - 2) / 2)); // Add up to 2 items that do not fit into the cache - for (size_t old_size = 1; - old_size <= WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE + 2; + for (size_t old_size = 1; old_size <= wp_db->SNAPSHOT_CACHE_SIZE + 2; old_size++) { const std::vector old_snapshots( snapshots.begin(), snapshots.begin() + old_size); @@ -397,21 +416,17 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { // The critical part is when iterating the snapshot cache. Afterwards, // we are operating under the lock size_t a_range = - std::min(old_snapshots.size(), - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) + - 1; + std::min(old_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; size_t b_range = - std::min(new_snapshots.size(), - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE) + - 1; + std::min(new_snapshots.size(), wp_db->SNAPSHOT_CACHE_SIZE) + 1; // Break each thread at two points for (size_t a1 = 1; a1 <= a_range; a1++) { for (size_t a2 = a1 + 1; a2 <= a_range; a2++) { for (size_t b1 = 1; b1 <= b_range; b1++) { for (size_t b2 = b1 + 1; b2 <= b_range; b2++) { - SnapshotConcurrentAccessTestInternal(wp_db, old_snapshots, - new_snapshots, entry, - version, a1, a2, b1, b2); + SnapshotConcurrentAccessTestInternal( + wp_db.get(), old_snapshots, new_snapshots, entry, version, + a1, a2, b1, b2); } } } @@ -423,16 +438,73 @@ TEST_P(WritePreparedTransactionTest, SnapshotConcurrentAccessTest) { } #endif +// This test clarifies the contract of AdvanceMaxEvictedSeq method +TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqBasicTest) { + DBImpl* mock_db = new DBImpl(options, dbname); + std::unique_ptr wp_db( + new WritePreparedTxnDBMock(mock_db, txn_db_options)); + + // 1. Set the initial values for max, prepared, and snapshots + SequenceNumber zero_max = 0l; + // Set the initial list of prepared txns + const std::vector initial_prepared = {10, 30, 50, 100, + 150, 200, 250}; + for (auto p : initial_prepared) { + wp_db->AddPrepared(p); + } + // This updates the max value and also set old prepared + SequenceNumber init_max = 100; + wp_db->AdvanceMaxEvictedSeq(zero_max, init_max); + const std::vector initial_snapshots = {20, 40}; + wp_db->SetDBSnapshots(initial_snapshots); + // This will update the internal cache of snapshots from the DB + wp_db->UpdateSnapshots(initial_snapshots, init_max); + + // 2. Invoke AdvanceMaxEvictedSeq + const std::vector latest_snapshots = {20, 110, 220, 300}; + wp_db->SetDBSnapshots(latest_snapshots); + SequenceNumber new_max = 200; + wp_db->AdvanceMaxEvictedSeq(init_max, new_max); + + // 3. Verify that the state matches with AdvanceMaxEvictedSeq contract + // a. max should be updated to new_max + ASSERT_EQ(wp_db->max_evicted_seq_, new_max); + // b. delayed prepared should contain every txn <= max and prepared should + // only contian txns > max + auto it = initial_prepared.begin(); + for (; it != initial_prepared.end() && *it <= new_max; it++) { + ASSERT_EQ(1, wp_db->delayed_prepared_.erase(*it)); + } + ASSERT_TRUE(wp_db->delayed_prepared_.empty()); + for (; it != initial_prepared.end() && !wp_db->prepared_txns_.empty(); + it++, wp_db->prepared_txns_.pop()) { + ASSERT_EQ(*it, wp_db->prepared_txns_.top()); + } + ASSERT_TRUE(it == initial_prepared.end()); + ASSERT_TRUE(wp_db->prepared_txns_.empty()); + // c. snapshots should contain everything below new_max + auto sit = latest_snapshots.begin(); + for (size_t i = 0; sit != latest_snapshots.end() && *sit <= new_max && + i < wp_db->snapshots_total_; + sit++, i++) { + ASSERT_TRUE(i < wp_db->snapshots_total_); + // This test is in small scale and the list of snapshots are assumed to be + // within the cache size limit. This is just a safety check to double check + // that assumption. + ASSERT_TRUE(i < wp_db->SNAPSHOT_CACHE_SIZE); + ASSERT_EQ(*sit, wp_db->snapshot_cache_[i]); + } +} + // Test WritePreparedTxnDB's IsInSnapshot against different ordering of // snapshot, max_committed_seq_, prepared, and commit entries. TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { WriteOptions wo; // Use small commit cache to trigger lots of eviction and fast advance of // max_evicted_seq_ - // will take effect after ReOpen - WritePreparedTxnDB::DEF_COMMIT_CACHE_SIZE = 8; + const size_t commit_cache_size = 8; // Same for snapshot cache size - WritePreparedTxnDB::DEF_SNAPSHOT_CACHE_SIZE = 5; + const size_t snapshot_cache_size = 5; // Take some preliminary snapshots first. This is to stress the data structure // that holds the old snapshots as it will be designed to be efficient when @@ -452,7 +524,6 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { uint64_t cur_txn = 0; // Number of snapshots taken so far int num_snapshots = 0; - std::vector to_be_released; // Number of gaps applied so far int gap_cnt = 0; // The final snapshot that we will inspect @@ -465,30 +536,23 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { // We keep the list of txns comitted before we take the last snaphot. // These should be the only seq numbers that will be found in the snapshot std::set committed_before; - ReOpen(); // to restart the db - WritePreparedTxnDB* wp_db = dynamic_cast(db); - assert(wp_db); - assert(wp_db->db_impl_); + DBImpl* mock_db = new DBImpl(options, dbname); + std::unique_ptr wp_db(new WritePreparedTxnDBMock( + mock_db, txn_db_options, snapshot_cache_size, commit_cache_size)); // We continue until max advances a bit beyond the snapshot. while (!snapshot || wp_db->max_evicted_seq_ < snapshot + 100) { // do prepare for a transaction - wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq seq++; - ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); wp_db->AddPrepared(seq); prepared.insert(seq); // If cur_txn is not started, do prepare for it. if (!cur_txn) { - wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq seq++; - ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); cur_txn = seq; wp_db->AddPrepared(cur_txn); } else { // else commit it - wp_db->db_impl_->Put(wo, "key", "value"); // dummy put to inc db seq seq++; - ASSERT_EQ(wp_db->db_impl_->GetLatestSequenceNumber(), seq); wp_db->AddCommitted(cur_txn, seq); if (!snapshot) { committed_before.insert(cur_txn); @@ -498,20 +562,15 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { if (num_snapshots < max_snapshots - 1) { // Take preliminary snapshots - auto tmp_snapshot = db->GetSnapshot(); - to_be_released.push_back(tmp_snapshot); + wp_db->TakeSnapshot(seq); num_snapshots++; } else if (gap_cnt < max_gap) { // Wait for some gap before taking the final snapshot gap_cnt++; } else if (!snapshot) { // Take the final snapshot if it is not already taken - auto tmp_snapshot = db->GetSnapshot(); - to_be_released.push_back(tmp_snapshot); - snapshot = tmp_snapshot->GetSequenceNumber(); - // We increase the db seq artificailly by a dummy Put. Check that this - // technique is effective and db seq is that same as ours. - ASSERT_EQ(snapshot, seq); + snapshot = seq; + wp_db->TakeSnapshot(snapshot); num_snapshots++; } @@ -548,9 +607,6 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) { } ASSERT_TRUE(wp_db->delayed_prepared_.empty()); ASSERT_TRUE(wp_db->prepared_txns_.empty()); - for (auto s : to_be_released) { - db->ReleaseSnapshot(s); - } } } }