mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-25 22:44:05 +00:00
transaction allocation perf improvements
Summary: Removed a couple of memory allocations Test Plan: changes covered by existing tests Reviewers: rven, yhchiang, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D53523
This commit is contained in:
parent
77926f93e5
commit
45768ade4f
|
@ -54,7 +54,7 @@ Status OptimisticTransactionImpl::Commit() {
|
|||
}
|
||||
|
||||
Status s = db_impl->WriteWithCallback(
|
||||
write_options_, write_batch_->GetWriteBatch(), &callback);
|
||||
write_options_, GetWriteBatch()->GetWriteBatch(), &callback);
|
||||
|
||||
if (s.ok()) {
|
||||
Clear();
|
||||
|
@ -77,7 +77,7 @@ Status OptimisticTransactionImpl::TryLock(ColumnFamilyHandle* column_family,
|
|||
|
||||
SequenceNumber seq;
|
||||
if (snapshot_) {
|
||||
seq = snapshot_->snapshot()->GetSequenceNumber();
|
||||
seq = snapshot_->GetSequenceNumber();
|
||||
} else {
|
||||
seq = db_->GetLatestSequenceNumber();
|
||||
}
|
||||
|
|
|
@ -21,14 +21,14 @@ TransactionBaseImpl::TransactionBaseImpl(DB* db,
|
|||
: db_(db),
|
||||
write_options_(write_options),
|
||||
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
|
||||
write_batch_(new WriteBatchWithIndex(cmp_, 0, true)),
|
||||
start_time_(db_->GetEnv()->NowMicros()) {}
|
||||
start_time_(db_->GetEnv()->NowMicros()),
|
||||
write_batch_(cmp_, 0, true) {}
|
||||
|
||||
TransactionBaseImpl::~TransactionBaseImpl() {}
|
||||
|
||||
void TransactionBaseImpl::Clear() {
|
||||
save_points_.reset(nullptr);
|
||||
write_batch_->Clear();
|
||||
write_batch_.Clear();
|
||||
tracked_keys_.clear();
|
||||
num_puts_ = 0;
|
||||
num_deletes_ = 0;
|
||||
|
@ -40,7 +40,11 @@ void TransactionBaseImpl::SetSnapshot() {
|
|||
auto db_impl = reinterpret_cast<DBImpl*>(db_);
|
||||
|
||||
const Snapshot* snapshot = db_impl->GetSnapshotForWriteConflictBoundary();
|
||||
snapshot_.reset(new ManagedSnapshot(db_, snapshot));
|
||||
|
||||
// Set a custom deleter for the snapshot_ SharedPtr as the snapshot needs to
|
||||
// be released, not deleted when it is no longer referenced.
|
||||
snapshot_.reset(snapshot, std::bind(&TransactionBaseImpl::ReleaseSnapshot,
|
||||
this, std::placeholders::_1, db_));
|
||||
snapshot_needed_ = false;
|
||||
snapshot_notifier_ = nullptr;
|
||||
}
|
||||
|
@ -84,7 +88,7 @@ void TransactionBaseImpl::SetSavePoint() {
|
|||
}
|
||||
save_points_->emplace(snapshot_, snapshot_needed_, snapshot_notifier_,
|
||||
num_puts_, num_deletes_, num_merges_);
|
||||
write_batch_->SetSavePoint();
|
||||
write_batch_.SetSavePoint();
|
||||
}
|
||||
|
||||
Status TransactionBaseImpl::RollbackToSavePoint() {
|
||||
|
@ -99,7 +103,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
|
|||
num_merges_ = save_point.num_merges_;
|
||||
|
||||
// Rollback batch
|
||||
Status s = write_batch_->RollbackToSavePoint();
|
||||
Status s = write_batch_.RollbackToSavePoint();
|
||||
assert(s.ok());
|
||||
|
||||
// Rollback any keys that were tracked since the last savepoint
|
||||
|
@ -119,7 +123,7 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
|
|||
|
||||
return s;
|
||||
} else {
|
||||
assert(write_batch_->RollbackToSavePoint().IsNotFound());
|
||||
assert(write_batch_.RollbackToSavePoint().IsNotFound());
|
||||
return Status::NotFound();
|
||||
}
|
||||
}
|
||||
|
@ -127,8 +131,8 @@ Status TransactionBaseImpl::RollbackToSavePoint() {
|
|||
Status TransactionBaseImpl::Get(const ReadOptions& read_options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key, std::string* value) {
|
||||
return write_batch_->GetFromBatchAndDB(db_, read_options, column_family, key,
|
||||
value);
|
||||
return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key,
|
||||
value);
|
||||
}
|
||||
|
||||
Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options,
|
||||
|
@ -189,7 +193,7 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options) {
|
|||
Iterator* db_iter = db_->NewIterator(read_options);
|
||||
assert(db_iter);
|
||||
|
||||
return write_batch_->NewIteratorWithBase(db_iter);
|
||||
return write_batch_.NewIteratorWithBase(db_iter);
|
||||
}
|
||||
|
||||
Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
|
||||
|
@ -197,7 +201,7 @@ Iterator* TransactionBaseImpl::GetIterator(const ReadOptions& read_options,
|
|||
Iterator* db_iter = db_->NewIterator(read_options, column_family);
|
||||
assert(db_iter);
|
||||
|
||||
return write_batch_->NewIteratorWithBase(column_family, db_iter);
|
||||
return write_batch_.NewIteratorWithBase(column_family, db_iter);
|
||||
}
|
||||
|
||||
Status TransactionBaseImpl::Put(ColumnFamilyHandle* column_family,
|
||||
|
@ -353,11 +357,11 @@ Status TransactionBaseImpl::DeleteUntracked(ColumnFamilyHandle* column_family,
|
|||
}
|
||||
|
||||
void TransactionBaseImpl::PutLogData(const Slice& blob) {
|
||||
write_batch_->PutLogData(blob);
|
||||
write_batch_.PutLogData(blob);
|
||||
}
|
||||
|
||||
WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
|
||||
return write_batch_.get();
|
||||
return &write_batch_;
|
||||
}
|
||||
|
||||
uint64_t TransactionBaseImpl::GetElapsedTime() const {
|
||||
|
@ -413,13 +417,17 @@ const TransactionKeyMap* TransactionBaseImpl::GetTrackedKeysSinceSavePoint() {
|
|||
WriteBatchBase* TransactionBaseImpl::GetBatchForWrite() {
|
||||
if (indexing_enabled_) {
|
||||
// Use WriteBatchWithIndex
|
||||
return write_batch_.get();
|
||||
return &write_batch_;
|
||||
} else {
|
||||
// Don't use WriteBatchWithIndex. Return base WriteBatch.
|
||||
return write_batch_->GetWriteBatch();
|
||||
return write_batch_.GetWriteBatch();
|
||||
}
|
||||
}
|
||||
|
||||
void TransactionBaseImpl::ReleaseSnapshot(const Snapshot* snapshot, DB* db) {
|
||||
db->ReleaseSnapshot(snapshot);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
|
|
@ -165,7 +165,7 @@ class TransactionBaseImpl : public Transaction {
|
|||
}
|
||||
|
||||
const Snapshot* GetSnapshot() const override {
|
||||
return snapshot_ ? snapshot_->snapshot() : nullptr;
|
||||
return snapshot_ ? snapshot_.get() : nullptr;
|
||||
}
|
||||
|
||||
void SetSnapshot() override;
|
||||
|
@ -202,6 +202,9 @@ class TransactionBaseImpl : public Transaction {
|
|||
write_options_ = write_options;
|
||||
}
|
||||
|
||||
// Used for memory management for snapshot_
|
||||
void ReleaseSnapshot(const Snapshot* snapshot, DB* db);
|
||||
|
||||
protected:
|
||||
// Add a key to the list of tracked keys.
|
||||
// seqno is the earliest seqno this key was involved with this transaction.
|
||||
|
@ -218,15 +221,12 @@ class TransactionBaseImpl : public Transaction {
|
|||
|
||||
const Comparator* cmp_;
|
||||
|
||||
// Records writes pending in this transaction
|
||||
std::unique_ptr<WriteBatchWithIndex> write_batch_;
|
||||
|
||||
// Stores that time the txn was constructed, in microseconds.
|
||||
const uint64_t start_time_;
|
||||
|
||||
// Stores the current snapshot that was was set by SetSnapshot or null if
|
||||
// no snapshot is currently set.
|
||||
std::shared_ptr<ManagedSnapshot> snapshot_;
|
||||
std::shared_ptr<const Snapshot> snapshot_;
|
||||
|
||||
// Count of various operations pending in this transaction
|
||||
uint64_t num_puts_ = 0;
|
||||
|
@ -234,7 +234,7 @@ class TransactionBaseImpl : public Transaction {
|
|||
uint64_t num_merges_ = 0;
|
||||
|
||||
struct SavePoint {
|
||||
std::shared_ptr<ManagedSnapshot> snapshot_;
|
||||
std::shared_ptr<const Snapshot> snapshot_;
|
||||
bool snapshot_needed_;
|
||||
std::shared_ptr<TransactionNotifier> snapshot_notifier_;
|
||||
uint64_t num_puts_;
|
||||
|
@ -244,7 +244,7 @@ class TransactionBaseImpl : public Transaction {
|
|||
// Record all keys tracked since the last savepoint
|
||||
TransactionKeyMap new_keys_;
|
||||
|
||||
SavePoint(std::shared_ptr<ManagedSnapshot> snapshot, bool snapshot_needed,
|
||||
SavePoint(std::shared_ptr<const Snapshot> snapshot, bool snapshot_needed,
|
||||
std::shared_ptr<TransactionNotifier> snapshot_notifier,
|
||||
uint64_t num_puts, uint64_t num_deletes, uint64_t num_merges)
|
||||
: snapshot_(snapshot),
|
||||
|
@ -256,6 +256,9 @@ class TransactionBaseImpl : public Transaction {
|
|||
};
|
||||
|
||||
private:
|
||||
// Records writes pending in this transaction
|
||||
WriteBatchWithIndex write_batch_;
|
||||
|
||||
// Stack of the Snapshot saved at each save point. Saved snapshots may be
|
||||
// nullptr if there was no snapshot at the time SetSavePoint() was called.
|
||||
std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint>> save_points_;
|
||||
|
|
|
@ -92,7 +92,7 @@ Status TransactionImpl::CommitBatch(WriteBatch* batch) {
|
|||
}
|
||||
|
||||
Status TransactionImpl::Commit() {
|
||||
Status s = DoCommit(write_batch_->GetWriteBatch());
|
||||
Status s = DoCommit(GetWriteBatch()->GetWriteBatch());
|
||||
|
||||
Clear();
|
||||
|
||||
|
@ -295,7 +295,7 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family,
|
|||
SequenceNumber* new_seqno) {
|
||||
assert(snapshot_);
|
||||
|
||||
SequenceNumber seq = snapshot_->snapshot()->GetSequenceNumber();
|
||||
SequenceNumber seq = snapshot_->GetSequenceNumber();
|
||||
if (prev_seqno <= seq) {
|
||||
// If the key has been previous validated at a sequence number earlier
|
||||
// than the curent snapshot's sequence number, we already know it has not
|
||||
|
@ -311,9 +311,9 @@ Status TransactionImpl::ValidateSnapshot(ColumnFamilyHandle* column_family,
|
|||
ColumnFamilyHandle* cfh =
|
||||
column_family ? column_family : db_impl->DefaultColumnFamily();
|
||||
|
||||
return TransactionUtil::CheckKeyForConflicts(
|
||||
db_impl, cfh, key.ToString(), snapshot_->snapshot()->GetSequenceNumber(),
|
||||
false /* cache_only */);
|
||||
return TransactionUtil::CheckKeyForConflicts(db_impl, cfh, key.ToString(),
|
||||
snapshot_->GetSequenceNumber(),
|
||||
false /* cache_only */);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
|
Loading…
Reference in a new issue