mirror of https://github.com/facebook/rocksdb.git
Fix shared lock upgrades
Summary: Upgrading a shared lock was silently succeeding because the actual locking code was skipped. This is because if the keys are tracked, it is assumed that they are already locked and do not require locking. Fix this by recording in tracked keys whether the key was locked exclusively or not. Note that lock downgrades are impossible, which is the behaviour we expect. This fixes facebook/mysql-5.6#587. Closes https://github.com/facebook/rocksdb/pull/2122 Differential Revision: D4861489 Pulled By: IslamAbdelRahman fbshipit-source-id: 58c7ebe7af098bf01b9774b666d3e9867747d8fd
This commit is contained in:
parent
1f8b119ed6
commit
9300ef5455
|
@ -107,7 +107,7 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||||
|
|
||||||
std::string key_str = key.ToString();
|
std::string key_str = key.ToString();
|
||||||
|
|
||||||
TrackKey(cfh_id, key_str, seq, read_only);
|
TrackKey(cfh_id, key_str, seq, read_only, exclusive);
|
||||||
|
|
||||||
// Always return OK. Confilct checking will happen at commit time.
|
// Always return OK. Confilct checking will happen at commit time.
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
|
|
@ -470,20 +470,22 @@ uint64_t TransactionBaseImpl::GetNumKeys() const {
|
||||||
}
|
}
|
||||||
|
|
||||||
void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
|
void TransactionBaseImpl::TrackKey(uint32_t cfh_id, const std::string& key,
|
||||||
SequenceNumber seq, bool read_only) {
|
SequenceNumber seq, bool read_only,
|
||||||
|
bool exclusive) {
|
||||||
// Update map of all tracked keys for this transaction
|
// Update map of all tracked keys for this transaction
|
||||||
TrackKey(&tracked_keys_, cfh_id, key, seq, read_only);
|
TrackKey(&tracked_keys_, cfh_id, key, seq, read_only, exclusive);
|
||||||
|
|
||||||
if (save_points_ != nullptr && !save_points_->empty()) {
|
if (save_points_ != nullptr && !save_points_->empty()) {
|
||||||
// Update map of tracked keys in this SavePoint
|
// Update map of tracked keys in this SavePoint
|
||||||
TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only);
|
TrackKey(&save_points_->top().new_keys_, cfh_id, key, seq, read_only,
|
||||||
|
exclusive);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a key to the given TransactionKeyMap
|
// Add a key to the given TransactionKeyMap
|
||||||
void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
|
void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
|
||||||
const std::string& key, SequenceNumber seq,
|
const std::string& key, SequenceNumber seq,
|
||||||
bool read_only) {
|
bool read_only, bool exclusive) {
|
||||||
auto& cf_key_map = (*key_map)[cfh_id];
|
auto& cf_key_map = (*key_map)[cfh_id];
|
||||||
auto iter = cf_key_map.find(key);
|
auto iter = cf_key_map.find(key);
|
||||||
if (iter == cf_key_map.end()) {
|
if (iter == cf_key_map.end()) {
|
||||||
|
@ -499,6 +501,7 @@ void TransactionBaseImpl::TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
|
||||||
} else {
|
} else {
|
||||||
iter->second.num_writes++;
|
iter->second.num_writes++;
|
||||||
}
|
}
|
||||||
|
iter->second.exclusive |= exclusive;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<TransactionKeyMap>
|
std::unique_ptr<TransactionKeyMap>
|
||||||
|
@ -529,7 +532,7 @@ TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
|
||||||
// All the reads/writes to this key were done in the last savepoint.
|
// All the reads/writes to this key were done in the last savepoint.
|
||||||
bool read_only = (num_writes == 0);
|
bool read_only = (num_writes == 0);
|
||||||
TrackKey(result, column_family_id, key, key_iter.second.seq,
|
TrackKey(result, column_family_id, key, key_iter.second.seq,
|
||||||
read_only);
|
read_only, key_iter.second.exclusive);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -227,12 +227,12 @@ class TransactionBaseImpl : public Transaction {
|
||||||
// seqno is the earliest seqno this key was involved with this transaction.
|
// seqno is the earliest seqno this key was involved with this transaction.
|
||||||
// readonly should be set to true if no data was written for this key
|
// readonly should be set to true if no data was written for this key
|
||||||
void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno,
|
void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno,
|
||||||
bool readonly);
|
bool readonly, bool exclusive);
|
||||||
|
|
||||||
// Helper function to add a key to the given TransactionKeyMap
|
// Helper function to add a key to the given TransactionKeyMap
|
||||||
static void TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
|
static void TrackKey(TransactionKeyMap* key_map, uint32_t cfh_id,
|
||||||
const std::string& key, SequenceNumber seqno,
|
const std::string& key, SequenceNumber seqno,
|
||||||
bool readonly);
|
bool readonly, bool exclusive);
|
||||||
|
|
||||||
// Called when UndoGetForUpdate determines that this key can be unlocked.
|
// Called when UndoGetForUpdate determines that this key can be unlocked.
|
||||||
virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
|
virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
|
||||||
|
|
|
@ -400,7 +400,7 @@ Status TransactionImpl::LockBatch(WriteBatch* batch,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
|
TrackKey(keys_to_unlock, cfh_id, std::move(key), kMaxSequenceNumber,
|
||||||
false);
|
false, true /* exclusive */);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
|
@ -426,6 +426,7 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||||
uint32_t cfh_id = GetColumnFamilyID(column_family);
|
uint32_t cfh_id = GetColumnFamilyID(column_family);
|
||||||
std::string key_str = key.ToString();
|
std::string key_str = key.ToString();
|
||||||
bool previously_locked;
|
bool previously_locked;
|
||||||
|
bool lock_upgrade = false;
|
||||||
Status s;
|
Status s;
|
||||||
|
|
||||||
// lock this key if this transactions hasn't already locked it
|
// lock this key if this transactions hasn't already locked it
|
||||||
|
@ -441,13 +442,17 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||||
if (iter == tracked_keys_cf->second.end()) {
|
if (iter == tracked_keys_cf->second.end()) {
|
||||||
previously_locked = false;
|
previously_locked = false;
|
||||||
} else {
|
} else {
|
||||||
|
if (!iter->second.exclusive && exclusive) {
|
||||||
|
lock_upgrade = true;
|
||||||
|
}
|
||||||
previously_locked = true;
|
previously_locked = true;
|
||||||
current_seqno = iter->second.seq;
|
current_seqno = iter->second.seq;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock this key if this transactions hasn't already locked it
|
// Lock this key if this transactions hasn't already locked it or we require
|
||||||
if (!previously_locked) {
|
// an upgrade.
|
||||||
|
if (!previously_locked || lock_upgrade) {
|
||||||
s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
|
s = txn_db_impl_->TryLock(this, cfh_id, key_str, exclusive);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -481,15 +486,21 @@ Status TransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
||||||
// Failed to validate key
|
// Failed to validate key
|
||||||
if (!previously_locked) {
|
if (!previously_locked) {
|
||||||
// Unlock key we just locked
|
// Unlock key we just locked
|
||||||
|
if (lock_upgrade) {
|
||||||
|
s = txn_db_impl_->TryLock(this, cfh_id, key_str,
|
||||||
|
false /* exclusive */);
|
||||||
|
assert(s.ok());
|
||||||
|
} else {
|
||||||
txn_db_impl_->UnLock(this, cfh_id, key.ToString());
|
txn_db_impl_->UnLock(this, cfh_id, key.ToString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
// Let base class know we've conflict checked this key.
|
// Let base class know we've conflict checked this key.
|
||||||
TrackKey(cfh_id, key_str, new_seqno, read_only);
|
TrackKey(cfh_id, key_str, new_seqno, read_only, exclusive);
|
||||||
}
|
}
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
|
|
|
@ -332,6 +332,43 @@ TEST_P(TransactionTest, SharedLocks) {
|
||||||
txn2->Rollback();
|
txn2->Rollback();
|
||||||
txn3->Rollback();
|
txn3->Rollback();
|
||||||
|
|
||||||
|
// Test txn1 and txn2 sharing a lock and txn2 trying to upgrade lock.
|
||||||
|
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
s = txn2->GetForUpdate(read_options, "foo", nullptr);
|
||||||
|
ASSERT_TRUE(s.IsTimedOut());
|
||||||
|
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
|
||||||
|
|
||||||
|
txn1->UndoGetForUpdate("foo");
|
||||||
|
s = txn2->GetForUpdate(read_options, "foo", nullptr);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
txn1->Rollback();
|
||||||
|
txn2->Rollback();
|
||||||
|
|
||||||
|
// Test txn1 trying to downgrade its lock.
|
||||||
|
s = txn1->GetForUpdate(read_options, "foo", nullptr, true /* exclusive */);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
|
||||||
|
ASSERT_TRUE(s.IsTimedOut());
|
||||||
|
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
|
||||||
|
|
||||||
|
// Should still fail after "downgrading".
|
||||||
|
s = txn1->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
|
||||||
|
ASSERT_OK(s);
|
||||||
|
|
||||||
|
s = txn2->GetForUpdate(read_options, "foo", nullptr, false /* exclusive */);
|
||||||
|
ASSERT_TRUE(s.IsTimedOut());
|
||||||
|
ASSERT_EQ(s.ToString(), "Operation timed out: Timeout waiting to lock key");
|
||||||
|
|
||||||
|
txn1->Rollback();
|
||||||
|
txn2->Rollback();
|
||||||
|
|
||||||
// Test txn1 holding an exclusive lock and txn2 trying to obtain shared
|
// Test txn1 holding an exclusive lock and txn2 trying to obtain shared
|
||||||
// access.
|
// access.
|
||||||
s = txn1->GetForUpdate(read_options, "foo", nullptr);
|
s = txn1->GetForUpdate(read_options, "foo", nullptr);
|
||||||
|
|
|
@ -24,8 +24,10 @@ struct TransactionKeyMapInfo {
|
||||||
uint32_t num_writes;
|
uint32_t num_writes;
|
||||||
uint32_t num_reads;
|
uint32_t num_reads;
|
||||||
|
|
||||||
|
bool exclusive;
|
||||||
|
|
||||||
explicit TransactionKeyMapInfo(SequenceNumber seq_no)
|
explicit TransactionKeyMapInfo(SequenceNumber seq_no)
|
||||||
: seq(seq_no), num_writes(0), num_reads(0) {}
|
: seq(seq_no), num_writes(0), num_reads(0), exclusive(false) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
using TransactionKeyMap =
|
using TransactionKeyMap =
|
||||||
|
|
Loading…
Reference in New Issue