diff --git a/db/db_impl.cc b/db/db_impl.cc index 4a55ad7e3d..eeb1670dec 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1374,7 +1374,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, snapshot = reinterpret_cast(read_options.snapshot)->number_; if (callback) { - snapshot = std::max(snapshot, callback->MaxUnpreparedSequenceNumber()); + snapshot = std::max(snapshot, callback->max_visible_seq()); } } else { // Since we get and reference the super version before getting diff --git a/db/db_iter.cc b/db/db_iter.cc index 0f29354aee..541a5fbed9 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -238,7 +238,12 @@ class DBIter final: public Iterator { void SeekToFirst() override; void SeekToLast() override; Env* env() { return env_; } - void set_sequence(uint64_t s) { sequence_ = s; } + void set_sequence(uint64_t s) { + sequence_ = s; + if (read_callback_) { + read_callback_->Refresh(s); + } + } void set_valid(bool v) { valid_ = v; } private: @@ -258,7 +263,7 @@ class DBIter final: public Iterator { void PrevInternal(); bool TooManyInternalKeysSkipped(bool increment = true); - bool IsVisible(SequenceNumber sequence); + inline bool IsVisible(SequenceNumber sequence); // CanReseekToSkip() returns whether the iterator can use the optimization // where it reseek by sequence number to get the next key when there are too @@ -266,12 +271,6 @@ class DBIter final: public Iterator { // sequence number does not guarantee that it is visible. inline bool CanReseekToSkip(); - // MaxVisibleSequenceNumber() returns the maximum visible sequence number - // for this snapshot. This sequence number may be greater than snapshot - // seqno because uncommitted data written to DB for write unprepared will - // have a higher sequence number. - inline SequenceNumber MaxVisibleSequenceNumber(); - // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() // is called void TempPinData() { @@ -311,6 +310,8 @@ class DBIter final: public Iterator { const MergeOperator* const merge_operator_; InternalIterator* iter_; ReadCallback* read_callback_; + // Max visible sequence number. It is normally the snapshot seq unless we have + // uncommitted data in db as in WriteUnCommitted. SequenceNumber sequence_; IterKey saved_key_; @@ -1246,21 +1247,15 @@ bool DBIter::TooManyInternalKeysSkipped(bool increment) { } bool DBIter::IsVisible(SequenceNumber sequence) { - return sequence <= MaxVisibleSequenceNumber() && - (read_callback_ == nullptr || read_callback_->IsVisible(sequence)); + if (read_callback_ == nullptr) { + return sequence <= sequence_; + } else { + return read_callback_->IsVisible(sequence); + } } bool DBIter::CanReseekToSkip() { - return read_callback_ == nullptr || - read_callback_->MaxUnpreparedSequenceNumber() == 0; -} - -SequenceNumber DBIter::MaxVisibleSequenceNumber() { - if (read_callback_ == nullptr) { - return sequence_; - } - - return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber()); + return read_callback_ == nullptr || read_callback_->CanReseekToSkip(); } void DBIter::Seek(const Slice& target) { @@ -1270,7 +1265,7 @@ void DBIter::Seek(const Slice& target) { ReleaseTempPinnedData(); ResetInternalKeysSkippedCounter(); - SequenceNumber seq = MaxVisibleSequenceNumber(); + SequenceNumber seq = sequence_; saved_key_.Clear(); saved_key_.SetInternalKey(target, seq); @@ -1556,6 +1551,9 @@ Status ArenaWrappedDBIter::Refresh() { new (&arena_) Arena(); SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); + if (read_callback_) { + read_callback_->Refresh(latest_seq); + } Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_, diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index e24b7c3b7e..8eee3c0ce8 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -20,7 +20,10 @@ namespace rocksdb { // A dumb ReadCallback which saying every key is committed. class DummyReadCallback : public ReadCallback { + public: + DummyReadCallback() : ReadCallback(kMaxSequenceNumber) {} bool IsVisibleFullCheck(SequenceNumber /*seq*/) override { return true; } + void SetSnapshot(SequenceNumber seq) { max_visible_seq_ = seq; } }; // Test param: @@ -39,6 +42,7 @@ class DBIteratorTest : public DBTestBase, SequenceNumber seq = read_options.snapshot != nullptr ? read_options.snapshot->GetSequenceNumber() : db_->GetLatestSequenceNumber(); + read_callback_.SetSnapshot(seq); bool use_read_callback = GetParam(); ReadCallback* read_callback = use_read_callback ? &read_callback_ : nullptr; return dbfull()->NewIteratorImpl(read_options, cfd, seq, read_callback); @@ -2476,15 +2480,12 @@ class DBIteratorWithReadCallbackTest : public DBIteratorTest {}; TEST_F(DBIteratorWithReadCallbackTest, ReadCallback) { class TestReadCallback : public ReadCallback { public: - explicit TestReadCallback(SequenceNumber last_visible_seq) - : last_visible_seq_(last_visible_seq) {} + explicit TestReadCallback(SequenceNumber max_visible_seq) + : ReadCallback(max_visible_seq) {} bool IsVisibleFullCheck(SequenceNumber seq) override { - return seq <= last_visible_seq_; + return seq <= max_visible_seq_; } - - private: - SequenceNumber last_visible_seq_; }; ASSERT_OK(Put("foo", "v1")); diff --git a/db/db_merge_operator_test.cc b/db/db_merge_operator_test.cc index 59bd84804e..2b5e4a445e 100644 --- a/db/db_merge_operator_test.cc +++ b/db/db_merge_operator_test.cc @@ -18,7 +18,9 @@ class TestReadCallback : public ReadCallback { public: TestReadCallback(SnapshotChecker* snapshot_checker, SequenceNumber snapshot_seq) - : snapshot_checker_(snapshot_checker), snapshot_seq_(snapshot_seq) {} + : ReadCallback(snapshot_seq), + snapshot_checker_(snapshot_checker), + snapshot_seq_(snapshot_seq) {} bool IsVisibleFullCheck(SequenceNumber seq) override { return snapshot_checker_->CheckInSnapshot(seq, snapshot_seq_) == diff --git a/db/db_test2.cc b/db/db_test2.cc index b62a1d5773..9a351dac07 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2729,7 +2729,8 @@ TEST_F(DBTest2, ReadCallbackTest) { class TestReadCallback : public ReadCallback { public: - explicit TestReadCallback(SequenceNumber snapshot) : snapshot_(snapshot) {} + explicit TestReadCallback(SequenceNumber snapshot) + : ReadCallback(snapshot), snapshot_(snapshot) {} bool IsVisibleFullCheck(SequenceNumber seq) override { return seq <= snapshot_; } diff --git a/db/read_callback.h b/db/read_callback.h index 1880aeb2d0..52573be19d 100644 --- a/db/read_callback.h +++ b/db/read_callback.h @@ -11,10 +11,10 @@ namespace rocksdb { class ReadCallback { public: - ReadCallback() {} - ReadCallback(SequenceNumber snapshot) : snapshot_(snapshot) {} - ReadCallback(SequenceNumber snapshot, SequenceNumber min_uncommitted) - : snapshot_(snapshot), min_uncommitted_(min_uncommitted) {} + ReadCallback(SequenceNumber last_visible_seq) + : max_visible_seq_(last_visible_seq) {} + ReadCallback(SequenceNumber last_visible_seq, SequenceNumber min_uncommitted) + : max_visible_seq_(last_visible_seq), min_uncommitted_(min_uncommitted) {} virtual ~ReadCallback() {} @@ -23,30 +23,33 @@ class ReadCallback { virtual bool IsVisibleFullCheck(SequenceNumber seq) = 0; inline bool IsVisible(SequenceNumber seq) { - if (seq == 0 || seq < min_uncommitted_) { - assert(seq <= snapshot_); + assert(min_uncommitted_ > 0); + assert(min_uncommitted_ >= kMinUnCommittedSeq); + if (seq < min_uncommitted_) { // handles seq == 0 as well + assert(seq <= max_visible_seq_); return true; - } else if (snapshot_ < seq) { + } else if (max_visible_seq_ < seq) { + assert(seq != 0); return false; } else { + assert(seq != 0); // already handled in the first if-then clause return IsVisibleFullCheck(seq); } } - // This is called to determine the maximum visible sequence number for the - // current transaction for read-your-own-write semantics. This is so that - // for write unprepared, we will not skip keys that are written by the - // current transaction with the seek to snapshot optimization. - // - // For other uses, this returns zero, meaning that the current snapshot - // sequence number is the maximum visible sequence number. - inline virtual SequenceNumber MaxUnpreparedSequenceNumber() { return 0; }; + inline SequenceNumber max_visible_seq() { return max_visible_seq_; } + + virtual void Refresh(SequenceNumber seq) { max_visible_seq_ = seq; } + + // Refer to DBIter::CanReseekToSkip + virtual bool CanReseekToSkip() { return true; } protected: - // The snapshot at which the read is performed. - const SequenceNumber snapshot_ = kMaxSequenceNumber; + // The max visible seq, it is usually the snapshot but could be larger if + // transaction has its own writes written to db. + SequenceNumber max_visible_seq_ = kMaxSequenceNumber; // Any seq less than min_uncommitted_ is committed. - const SequenceNumber min_uncommitted_ = 0; + const SequenceNumber min_uncommitted_ = kMinUnCommittedSeq; }; } // namespace rocksdb diff --git a/db/snapshot_impl.h b/db/snapshot_impl.h index 3483a26c6f..f2610fd18b 100644 --- a/db/snapshot_impl.h +++ b/db/snapshot_impl.h @@ -24,7 +24,7 @@ class SnapshotImpl : public Snapshot { // It indicates the smallest uncommitted data at the time the snapshot was // taken. This is currently used by WritePrepared transactions to limit the // scope of queries to IsInSnpashot. - SequenceNumber min_uncommitted_ = 0; + SequenceNumber min_uncommitted_ = kMinUnCommittedSeq; virtual SequenceNumber GetSequenceNumber() const override { return number_; } diff --git a/include/rocksdb/types.h b/include/rocksdb/types.h index d2022d33dc..2cd4039bd7 100644 --- a/include/rocksdb/types.h +++ b/include/rocksdb/types.h @@ -15,6 +15,8 @@ namespace rocksdb { // Represents a sequence number in a WAL file. typedef uint64_t SequenceNumber; +const SequenceNumber kMinUnCommittedSeq = 1; // 0 is always committed + // User-oriented representation of internal key types. enum EntryType { kEntryPut, diff --git a/utilities/transactions/snapshot_checker.cc b/utilities/transactions/snapshot_checker.cc index cfe51cb1be..695c020ef7 100644 --- a/utilities/transactions/snapshot_checker.cc +++ b/utilities/transactions/snapshot_checker.cc @@ -35,7 +35,7 @@ SnapshotCheckerResult WritePreparedSnapshotChecker::CheckInSnapshot( bool snapshot_released = false; // TODO(myabandeh): set min_uncommitted bool in_snapshot = txn_db_->IsInSnapshot( - sequence, snapshot_sequence, 0 /*min_uncommitted*/, &snapshot_released); + sequence, snapshot_sequence, kMinUnCommittedSeq, &snapshot_released); if (snapshot_released) { return SnapshotCheckerResult::kSnapshotReleased; } diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc index 1de25addae..c0f5a10682 100644 --- a/utilities/transactions/write_prepared_transaction_test.cc +++ b/utilities/transactions/write_prepared_transaction_test.cc @@ -1262,6 +1262,7 @@ TEST_P(WritePreparedTransactionTest, AdvanceSeqByOne) { TEST_P(WritePreparedTransactionTest, TxnInitialize) { TransactionOptions txn_options; WriteOptions write_options; + ASSERT_OK(db->Put(write_options, "key", "value")); Transaction* txn0 = db->BeginTransaction(write_options, txn_options); ASSERT_OK(txn0->SetName("xid")); ASSERT_OK(txn0->Put(Slice("key"), Slice("value1"))); @@ -1274,7 +1275,7 @@ TEST_P(WritePreparedTransactionTest, TxnInitialize) { auto snap_impl = reinterpret_cast(snap); // If ::Initialize calls the overriden SetSnapshot, min_uncommitted_ must be // udpated - ASSERT_GT(snap_impl->min_uncommitted_, 0); + ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq); txn0->Rollback(); txn1->Rollback(); @@ -1679,7 +1680,7 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) { size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq; wp_db->AddCommitted(overwrite_seq, overwrite_seq); SequenceNumber snap_seq; - uint64_t min_uncommitted = 0; + uint64_t min_uncommitted = kMinUnCommittedSeq; bool released; released = false; diff --git a/utilities/transactions/write_prepared_txn.cc b/utilities/transactions/write_prepared_txn.cc index c5f8cdd8cc..98eee11f7d 100644 --- a/utilities/transactions/write_prepared_txn.cc +++ b/utilities/transactions/write_prepared_txn.cc @@ -50,7 +50,8 @@ Status WritePreparedTxn::Get(const ReadOptions& read_options, auto snapshot = read_options.snapshot; auto snap_seq = snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; - SequenceNumber min_uncommitted = 0; // by default disable the optimization + SequenceNumber min_uncommitted = + kMinUnCommittedSeq; // by default disable the optimization if (snapshot != nullptr) { min_uncommitted = static_cast_with_check(snapshot) @@ -235,8 +236,7 @@ Status WritePreparedTxn::RollbackInternal() { std::map& handles, bool rollback_merge_operands) : db_(db), - callback(wpt_db, snap_seq, - 0), // 0 disables min_uncommitted optimization + callback(wpt_db, snap_seq), // disable min_uncommitted optimization rollback_batch_(dst_batch), comparators_(comparators), handles_(handles), diff --git a/utilities/transactions/write_prepared_txn.h b/utilities/transactions/write_prepared_txn.h index 46c114c742..2cd729cd2c 100644 --- a/utilities/transactions/write_prepared_txn.h +++ b/utilities/transactions/write_prepared_txn.h @@ -53,9 +53,11 @@ class WritePreparedTxn : public PessimisticTransaction { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; - // To make WAL commit markers visible, the snapshot will be based on the last - // seq in the WAL that is also published, LastPublishedSequence, as opposed to - // the last seq in the memtable. + // Note: The behavior is undefined in presence of interleaved writes to the + // same transaction. + // To make WAL commit markers visible, the snapshot will be + // based on the last seq in the WAL that is also published, + // LastPublishedSequence, as opposed to the last seq in the memtable. using Transaction::GetIterator; virtual Iterator* GetIterator(const ReadOptions& options) override; virtual Iterator* GetIterator(const ReadOptions& options, diff --git a/utilities/transactions/write_prepared_txn_db.h b/utilities/transactions/write_prepared_txn_db.h index bc284029ac..52d07a791a 100644 --- a/utilities/transactions/write_prepared_txn_db.h +++ b/utilities/transactions/write_prepared_txn_db.h @@ -110,12 +110,13 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { // If the snapshot_seq is already released and snapshot_seq <= max, sets // *snap_released to true and returns true as well. inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq, - uint64_t min_uncommitted = 0, + uint64_t min_uncommitted = kMinUnCommittedSeq, bool* snap_released = nullptr) const { ROCKS_LOG_DETAILS(info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " min_uncommitted %" PRIu64, prep_seq, snapshot_seq, min_uncommitted); + assert(min_uncommitted >= kMinUnCommittedSeq); // Caller is responsible to initialize snap_released. assert(snap_released == nullptr || *snap_released == false); // Here we try to infer the return value without looking into prepare list. @@ -730,6 +731,8 @@ class WritePreparedTxnDB : public PessimisticTransactionDB { class WritePreparedTxnReadCallback : public ReadCallback { public: + WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) + : ReadCallback(snapshot), db_(db) {} WritePreparedTxnReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, SequenceNumber min_uncommitted) : ReadCallback(snapshot, min_uncommitted), db_(db) {} @@ -737,9 +740,11 @@ class WritePreparedTxnReadCallback : public ReadCallback { // Will be called to see if the seq number visible; if not it moves on to // the next seq number. inline virtual bool IsVisibleFullCheck(SequenceNumber seq) override { - return db_->IsInSnapshot(seq, snapshot_, min_uncommitted_); + auto snapshot = max_visible_seq_; + return db_->IsInSnapshot(seq, snapshot, min_uncommitted_); } + // TODO(myabandeh): override Refresh when Iterator::Refresh is supported private: WritePreparedTxnDB* db_; }; diff --git a/utilities/transactions/write_unprepared_transaction_test.cc b/utilities/transactions/write_unprepared_transaction_test.cc index 009991bb7c..9aee33b078 100644 --- a/utilities/transactions/write_unprepared_transaction_test.cc +++ b/utilities/transactions/write_unprepared_transaction_test.cc @@ -81,12 +81,12 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { ReadOptions roptions; roptions.snapshot = snapshot0; + wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = + snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); auto iter = txn->GetIterator(roptions); // Test Get(). std::string value; - wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = - snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); ASSERT_EQ(value, "v3"); @@ -96,6 +96,8 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); ASSERT_OK(txn->Get(roptions, Slice("a"), &value)); ASSERT_EQ(value, "v7"); @@ -108,6 +110,8 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { // Test Next(). wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); iter->Seek("a"); verify_state(iter, "a", "v3"); @@ -123,6 +127,8 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); iter->Seek("a"); verify_state(iter, "a", "v7"); @@ -143,11 +149,11 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { // // Because of row locks and ValidateSnapshot, there cannot be any committed // entries after snapshot, but before the first prepared key. - delete iter; roptions.snapshot = snapshot2; - iter = txn->GetIterator(roptions); wup_txn->unprep_seqs_[snapshot2->GetSequenceNumber() + 1] = snapshot4->GetSequenceNumber() - snapshot2->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); iter->SeekForPrev("b"); verify_state(iter, "b", "v4"); @@ -161,11 +167,11 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) { iter->Prev(); verify_state(iter, "a", "v3"); - delete iter; roptions.snapshot = snapshot6; - iter = txn->GetIterator(roptions); wup_txn->unprep_seqs_[snapshot6->GetSequenceNumber() + 1] = snapshot8->GetSequenceNumber() - snapshot6->GetSequenceNumber(); + delete iter; + iter = txn->GetIterator(roptions); iter->SeekForPrev("b"); verify_state(iter, "b", "v8"); diff --git a/utilities/transactions/write_unprepared_txn.cc b/utilities/transactions/write_unprepared_txn.cc index f72589fad2..027fe7368b 100644 --- a/utilities/transactions/write_unprepared_txn.cc +++ b/utilities/transactions/write_unprepared_txn.cc @@ -34,12 +34,12 @@ bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) { return db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_); } -SequenceNumber WriteUnpreparedTxnReadCallback::MaxUnpreparedSequenceNumber() { - auto unprep_seqs = txn_->GetUnpreparedSequenceNumbers(); +SequenceNumber WriteUnpreparedTxnReadCallback::CalcMaxUnpreparedSequenceNumber( + WriteUnpreparedTxn* txn) { + auto unprep_seqs = txn->GetUnpreparedSequenceNumbers(); if (unprep_seqs.size()) { return unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; } - return 0; } @@ -379,7 +379,7 @@ Status WriteUnpreparedTxn::RollbackInternal() { // Note that we do not use WriteUnpreparedTxnReadCallback because we do not // need to read our own writes when reading prior versions of the key for // rollback. - WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq, 0); + WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq); for (const auto& cfkey : write_set_keys_) { const auto cfid = cfkey.first; const auto& keys = cfkey.second; @@ -475,7 +475,8 @@ Status WriteUnpreparedTxn::Get(const ReadOptions& options, auto snapshot = options.snapshot; auto snap_seq = snapshot != nullptr ? snapshot->GetSequenceNumber() : kMaxSequenceNumber; - SequenceNumber min_uncommitted = 0; // by default disable the optimization + SequenceNumber min_uncommitted = + kMinUnCommittedSeq; // by default disable the optimization if (snapshot != nullptr) { min_uncommitted = static_cast_with_check(snapshot) diff --git a/utilities/transactions/write_unprepared_txn.h b/utilities/transactions/write_unprepared_txn.h index c962a0b0dc..4a00406d7b 100644 --- a/utilities/transactions/write_unprepared_txn.h +++ b/utilities/transactions/write_unprepared_txn.h @@ -23,17 +23,33 @@ class WriteUnpreparedTxnReadCallback : public ReadCallback { SequenceNumber snapshot, SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn) - // Disable snapshot check on parent class since it would violate - // read-your-own-write guarantee. - : ReadCallback(kMaxSequenceNumber, min_uncommitted), + // Pass our last uncommitted seq as the snapshot to the parent class to + // ensure that the parent will not prematurely filter out own writes. We + // will do the exact comparison agaisnt snapshots in IsVisibleFullCheck + // override. + : ReadCallback(CalcMaxVisibleSeq(txn, snapshot), min_uncommitted), db_(db), txn_(txn), wup_snapshot_(snapshot) {} virtual bool IsVisibleFullCheck(SequenceNumber seq) override; - virtual SequenceNumber MaxUnpreparedSequenceNumber() override; + bool CanReseekToSkip() override { + return wup_snapshot_ == max_visible_seq_; + // Otherwise our own writes uncommitted are in db, and the assumptions + // behind reseek optimizations are no longer valid. + } + + // TODO(myabandeh): override Refresh when Iterator::Refresh is supported private: + SequenceNumber CalcMaxVisibleSeq(WriteUnpreparedTxn* txn, + SequenceNumber snapshot_seq) { + SequenceNumber max_unprepared = CalcMaxUnpreparedSequenceNumber(txn); + assert(snapshot_seq < max_unprepared || max_unprepared == 0 || + snapshot_seq == kMaxSequenceNumber); + return std::max(max_unprepared, snapshot_seq); + } + SequenceNumber CalcMaxUnpreparedSequenceNumber(WriteUnpreparedTxn* txn); WritePreparedTxnDB* db_; WriteUnpreparedTxn* txn_; SequenceNumber wup_snapshot_; diff --git a/utilities/transactions/write_unprepared_txn_db.cc b/utilities/transactions/write_unprepared_txn_db.cc index 73f51c84b9..8c67093213 100644 --- a/utilities/transactions/write_unprepared_txn_db.cc +++ b/utilities/transactions/write_unprepared_txn_db.cc @@ -31,17 +31,17 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( class InvalidSnapshotReadCallback : public ReadCallback { public: - InvalidSnapshotReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot, - SequenceNumber min_uncommitted) - : ReadCallback(snapshot, min_uncommitted), db_(db) {} + InvalidSnapshotReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot) + : ReadCallback(snapshot), db_(db) {} // Will be called to see if the seq number visible; if not it moves on to // the next seq number. inline bool IsVisibleFullCheck(SequenceNumber seq) override { // Becomes true if it cannot tell by comparing seq with snapshot seq since - // the snapshot_ is not a real snapshot. + // the snapshot is not a real snapshot. + auto snapshot = max_visible_seq_; bool released = false; - auto ret = db_->IsInSnapshot(seq, snapshot_, min_uncommitted_, &released); + auto ret = db_->IsInSnapshot(seq, snapshot, min_uncommitted_, &released); assert(!released || ret); return ret; } @@ -73,8 +73,8 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( std::map& handles, bool rollback_merge_operands) : db_(db), - callback(wpt_db, snap_seq, - 0), // 0 disables min_uncommitted optimization + callback(wpt_db, snap_seq), + // disable min_uncommitted optimization rollback_batch_(dst_batch), comparators_(comparators), handles_(handles), @@ -354,6 +354,7 @@ struct WriteUnpreparedTxnDB::IteratorState { std::shared_ptr s, SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn) : callback(txn_db, sequence, min_uncommitted, txn), snapshot(s) {} + SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); } WriteUnpreparedTxnReadCallback callback; std::shared_ptr snapshot; @@ -395,8 +396,8 @@ Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& options, auto* state = new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn); auto* db_iter = - db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback, - !ALLOW_BLOB, !ALLOW_REFRESH); + db_impl_->NewIteratorImpl(options, cfd, state->MaxVisibleSeq(), + &state->callback, !ALLOW_BLOB, !ALLOW_REFRESH); db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr); return db_iter; }