mirror of https://github.com/facebook/rocksdb.git
WritePrepared: Report released snapshots in IsInSnapshot (#4856)
Summary: Previously IsInSnapshot assumed that the snapshot is valid at the time that the function is called. However there are cases where that might not be valid. Example is background compactions where the compaction algorithm operates with a list of snapshots some of which might be released by the time they are being passed to IsInSnapshot. The patch make two changes to enable the caller to tell difference: i) any live snapshot below max is added to max_committed_seq_, which allows IsInSnapshot to confidently tell whether the passed snapshot is invalid if it below max, ii) extends IsInSnapshot API with a "released" variable that is set true when IsInSnapshot find no such snapshot below max and also find no other way to give a certain return value. In such cases the return value is true but the caller should also check the "released" boolean after the call. In short here is the changes in the API: i) If the snapshot is valid, no change is required. ii) If the snapshot might be invalid, a reference to "released" boolean must be passed to IsInSnapshot. ii-a) If snapshot is above max, IsInSnapshot can figure the return valid using the commit cache. ii-b) otherwise if snapshot is in old_commit_map_, IsInSnapshot can use that to tell if value was visible to the snapshot. ii-c) otherwise it sets "released" to true and returns true as well. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4856 Differential Revision: D13599847 Pulled By: maysamyabandeh fbshipit-source-id: 1752be28667f886a1efec8cae5714b9b7a8f1e0f
This commit is contained in:
parent
8641e9adf7
commit
f3a99e8a4d
|
@ -1484,6 +1484,80 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMapTest) {
|
|||
}
|
||||
}
|
||||
|
||||
// Shows the contract of IsInSnapshot when called on invalid/released snapshots
|
||||
TEST_P(WritePreparedTransactionTest, IsInSnapshotReleased) {
|
||||
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
|
||||
WriteOptions woptions;
|
||||
ASSERT_OK(db->Put(woptions, "key", "value"));
|
||||
// snap seq = 1
|
||||
const Snapshot* snap1 = db->GetSnapshot();
|
||||
ASSERT_OK(db->Put(woptions, "key", "value"));
|
||||
ASSERT_OK(db->Put(woptions, "key", "value"));
|
||||
// snap seq = 3
|
||||
const Snapshot* snap2 = db->GetSnapshot();
|
||||
const SequenceNumber seq = 1;
|
||||
// Evict seq out of commit cache
|
||||
size_t overwrite_seq = wp_db->COMMIT_CACHE_SIZE + seq;
|
||||
wp_db->AddCommitted(overwrite_seq, overwrite_seq);
|
||||
SequenceNumber snap_seq;
|
||||
uint64_t min_uncommitted = 0;
|
||||
bool released;
|
||||
|
||||
released = false;
|
||||
snap_seq = snap1->GetSequenceNumber();
|
||||
ASSERT_LE(seq, snap_seq);
|
||||
// Valid snapshot lower than max
|
||||
ASSERT_LE(snap_seq, wp_db->max_evicted_seq_);
|
||||
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
|
||||
ASSERT_FALSE(released);
|
||||
|
||||
released = false;
|
||||
snap_seq = snap1->GetSequenceNumber();
|
||||
// Invaid snapshot lower than max
|
||||
ASSERT_LE(snap_seq + 1, wp_db->max_evicted_seq_);
|
||||
ASSERT_TRUE(
|
||||
wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
|
||||
ASSERT_TRUE(released);
|
||||
|
||||
db->ReleaseSnapshot(snap1);
|
||||
|
||||
released = false;
|
||||
// Released snapshot lower than max
|
||||
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
|
||||
// The release does not take affect until the next max advance
|
||||
ASSERT_FALSE(released);
|
||||
|
||||
released = false;
|
||||
// Invaid snapshot lower than max
|
||||
ASSERT_TRUE(
|
||||
wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
|
||||
ASSERT_TRUE(released);
|
||||
|
||||
// This make the snapshot release to reflect in txn db structures
|
||||
wp_db->AdvanceMaxEvictedSeq(wp_db->max_evicted_seq_,
|
||||
wp_db->max_evicted_seq_ + 1);
|
||||
|
||||
released = false;
|
||||
// Released snapshot lower than max
|
||||
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
|
||||
ASSERT_TRUE(released);
|
||||
|
||||
released = false;
|
||||
// Invaid snapshot lower than max
|
||||
ASSERT_TRUE(
|
||||
wp_db->IsInSnapshot(seq, snap_seq + 1, min_uncommitted, &released));
|
||||
ASSERT_TRUE(released);
|
||||
|
||||
snap_seq = snap2->GetSequenceNumber();
|
||||
|
||||
released = false;
|
||||
// Unreleased snapshot lower than max
|
||||
ASSERT_TRUE(wp_db->IsInSnapshot(seq, snap_seq, min_uncommitted, &released));
|
||||
ASSERT_FALSE(released);
|
||||
|
||||
db->ReleaseSnapshot(snap2);
|
||||
}
|
||||
|
||||
// Test WritePreparedTxnDB's IsInSnapshot against different ordering of
|
||||
// snapshot, max_committed_seq_, prepared, and commit entries.
|
||||
TEST_P(WritePreparedTransactionTest, IsInSnapshotTest) {
|
||||
|
|
|
@ -535,6 +535,15 @@ void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
|
|||
}
|
||||
if (update_snapshots) {
|
||||
UpdateSnapshots(snapshots, new_snapshots_version);
|
||||
if (!snapshots.empty()) {
|
||||
WriteLock wl(&old_commit_map_mutex_);
|
||||
for (auto snap : snapshots) {
|
||||
// This allows IsInSnapshot to tell apart the reads from in valid
|
||||
// snapshots from the reads from committed values in valid snapshots.
|
||||
old_commit_map_[snap];
|
||||
}
|
||||
old_commit_map_empty_.store(false, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
auto updated_prev_max = prev_max;
|
||||
while (updated_prev_max < new_max &&
|
||||
|
|
|
@ -115,8 +115,11 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
|
|||
// Check whether the transaction that wrote the value with sequence number seq
|
||||
// is visible to the snapshot with sequence number snapshot_seq.
|
||||
// Returns true if commit_seq <= snapshot_seq
|
||||
// If the snapshot_seq is already released and snapshot_seq <= max, sets
|
||||
// *snap_released to true and returns true as well.
|
||||
inline bool IsInSnapshot(uint64_t prep_seq, uint64_t snapshot_seq,
|
||||
uint64_t min_uncommitted = 0) const {
|
||||
uint64_t min_uncommitted = 0,
|
||||
bool* snap_released = nullptr) const {
|
||||
ROCKS_LOG_DETAILS(info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64
|
||||
" min_uncommitted %" PRIu64,
|
||||
|
@ -210,9 +213,15 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
|
|||
// snapshot. If there was no overlapping commit entry, then it is committed
|
||||
// with a commit_seq lower than any live snapshot, including snapshot_seq.
|
||||
if (old_commit_map_empty_.load(std::memory_order_acquire)) {
|
||||
ROCKS_LOG_DETAILS(
|
||||
info_log_, "IsInSnapshot %" PRIu64 " in %" PRIu64 " returns %" PRId32,
|
||||
prep_seq, snapshot_seq, 1);
|
||||
ROCKS_LOG_DETAILS(info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64
|
||||
" returns %" PRId32 " released=1",
|
||||
prep_seq, snapshot_seq, 0);
|
||||
assert(snap_released);
|
||||
// This snapshot is not valid anymore. We cannot tell if prep_seq is
|
||||
// committed before or after the snapshot. Return true but also set
|
||||
// snap_released to true.
|
||||
*snap_released = true;
|
||||
return true;
|
||||
}
|
||||
{
|
||||
|
@ -226,7 +235,20 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
|
|||
if (found) {
|
||||
auto& vec = prep_set_entry->second;
|
||||
found = std::binary_search(vec.begin(), vec.end(), prep_seq);
|
||||
} else {
|
||||
// coming from compaction
|
||||
ROCKS_LOG_DETAILS(info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64
|
||||
" returns %" PRId32 " released=1",
|
||||
prep_seq, snapshot_seq, 0);
|
||||
// This snapshot is not valid anymore. We cannot tell if prep_seq is
|
||||
// committed before or after the snapshot. Return true but also set
|
||||
// snap_released to true.
|
||||
assert(snap_released);
|
||||
*snap_released = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!found) {
|
||||
ROCKS_LOG_DETAILS(info_log_,
|
||||
"IsInSnapshot %" PRIu64 " in %" PRIu64
|
||||
|
@ -379,6 +401,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
|
|||
friend class WritePreparedTransactionTest_BasicRecoveryTest_Test;
|
||||
friend class WritePreparedTransactionTest_DoubleSnapshot_Test;
|
||||
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
|
||||
friend class WritePreparedTransactionTest_IsInSnapshotReleased_Test;
|
||||
friend class WritePreparedTransactionTest_OldCommitMapGC_Test;
|
||||
friend class WritePreparedTransactionTest_RollbackTest_Test;
|
||||
friend class WriteUnpreparedTxnDB;
|
||||
|
|
|
@ -372,15 +372,14 @@ Status WriteUnpreparedTxn::RollbackInternal() {
|
|||
assert(GetId() != kMaxSequenceNumber);
|
||||
assert(GetId() > 0);
|
||||
const auto& cf_map = *wupt_db_->GetCFHandleMap();
|
||||
// In WritePrepared, the txn is is the same as prepare seq
|
||||
auto last_visible_txn = GetId() - 1;
|
||||
auto read_at_seq = kMaxSequenceNumber;
|
||||
Status s;
|
||||
|
||||
ReadOptions roptions;
|
||||
// Note that we do not use WriteUnpreparedTxnReadCallback because we do not
|
||||
// need to read our own writes when reading prior versions of the key for
|
||||
// rollback.
|
||||
WritePreparedTxnReadCallback callback(wpt_db_, last_visible_txn, 0);
|
||||
WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq, 0);
|
||||
for (const auto& cfkey : write_set_keys_) {
|
||||
const auto cfid = cfkey.first;
|
||||
const auto& keys = cfkey.second;
|
||||
|
|
|
@ -29,6 +29,29 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
|
|||
// rollback batch.
|
||||
w_options.disableWAL = true;
|
||||
|
||||
class InvalidSnapshotReadCallback : public ReadCallback {
|
||||
public:
|
||||
InvalidSnapshotReadCallback(WritePreparedTxnDB* db, SequenceNumber snapshot,
|
||||
SequenceNumber min_uncommitted)
|
||||
: db_(db), snapshot_(snapshot), min_uncommitted_(min_uncommitted) {}
|
||||
|
||||
// Will be called to see if the seq number visible; if not it moves on to
|
||||
// the next seq number.
|
||||
inline virtual bool IsVisible(SequenceNumber seq) override {
|
||||
// Becomes true if it cannot tell by comparing seq with snapshot seq since
|
||||
// the snapshot_ is not a real snapshot.
|
||||
bool released = false;
|
||||
auto ret = db_->IsInSnapshot(seq, snapshot_, min_uncommitted_, &released);
|
||||
assert(!released || ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
private:
|
||||
WritePreparedTxnDB* db_;
|
||||
SequenceNumber snapshot_;
|
||||
SequenceNumber min_uncommitted_;
|
||||
};
|
||||
|
||||
// Iterate starting with largest sequence number.
|
||||
for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); it++) {
|
||||
auto last_visible_txn = it->first - 1;
|
||||
|
@ -38,7 +61,7 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
|
|||
struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
|
||||
DBImpl* db_;
|
||||
ReadOptions roptions;
|
||||
WritePreparedTxnReadCallback callback;
|
||||
InvalidSnapshotReadCallback callback;
|
||||
WriteBatch* rollback_batch_;
|
||||
std::map<uint32_t, const Comparator*>& comparators_;
|
||||
std::map<uint32_t, ColumnFamilyHandle*>& handles_;
|
||||
|
|
Loading…
Reference in New Issue