diff --git a/include/rocksdb/utilities/transaction.h b/include/rocksdb/utilities/transaction.h index 8507ef133f..a3519739c2 100644 --- a/include/rocksdb/utilities/transaction.h +++ b/include/rocksdb/utilities/transaction.h @@ -169,8 +169,26 @@ class Transaction { ColumnFamilyHandle* column_family, const Slice& key, std::string* value) = 0; + // An overload of the the above method that receives a PinnableSlice + // For backward compatiblity a default implementation is provided + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* pinnable_val) { + assert(pinnable_val != nullptr); + auto s = Get(options, column_family, key, pinnable_val->GetSelf()); + pinnable_val->PinSelf(); + return s; + } + virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value) = 0; + virtual Status Get(const ReadOptions& options, const Slice& key, + PinnableSlice* pinnable_val) { + assert(pinnable_val != nullptr); + auto s = Get(options, key, pinnable_val->GetSelf()); + pinnable_val->PinSelf(); + return s; + } virtual std::vector MultiGet( const ReadOptions& options, @@ -212,6 +230,22 @@ class Transaction { const Slice& key, std::string* value, bool exclusive = true) = 0; + // An overload of the the above method that receives a PinnableSlice + // For backward compatiblity a default implementation is provided + virtual Status GetForUpdate(const ReadOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* pinnable_val, + bool exclusive = true) { + if (pinnable_val == nullptr) { + std::string* null_str = nullptr; + return GetForUpdate(options, key, null_str); + } else { + auto s = GetForUpdate(options, key, pinnable_val->GetSelf()); + pinnable_val->PinSelf(); + return s; + } + } + virtual Status GetForUpdate(const ReadOptions& options, const Slice& key, std::string* value, bool exclusive = true) = 0; diff --git a/include/rocksdb/utilities/write_batch_with_index.h b/include/rocksdb/utilities/write_batch_with_index.h index 38809e1c78..24d8f30aa5 100644 --- a/include/rocksdb/utilities/write_batch_with_index.h +++ b/include/rocksdb/utilities/write_batch_with_index.h @@ -186,10 +186,20 @@ class WriteBatchWithIndex : public WriteBatchBase { // regardless). Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, const Slice& key, std::string* value); + + // An overload of the the above method that receives a PinnableSlice + Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, + const Slice& key, PinnableSlice* value); + Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value); + // An overload of the the above method that receives a PinnableSlice + Status GetFromBatchAndDB(DB* db, const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value); + // Records the state of the batch for future calls to RollbackToSavePoint(). // May be called multiple times to set multiple save points. void SetSavePoint() override; diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc index 0357c113f2..4612dfa549 100644 --- a/utilities/transactions/transaction_base.cc +++ b/utilities/transactions/transaction_base.cc @@ -181,8 +181,21 @@ Status TransactionBaseImpl::RollbackToSavePoint() { Status TransactionBaseImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { + assert(value != nullptr); + PinnableSlice pinnable_val(value); + assert(!pinnable_val.IsPinned()); + auto s = Get(read_options, column_family, key, &pinnable_val); + if (s.ok() && pinnable_val.IsPinned()) { + value->assign(pinnable_val.data(), pinnable_val.size()); + } // else value is already assigned + return s; +} + +Status TransactionBaseImpl::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* pinnable_val) { return write_batch_.GetFromBatchAndDB(db_, read_options, column_family, key, - value); + pinnable_val); } Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, @@ -192,7 +205,26 @@ Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, Status s = TryLock(column_family, key, true /* read_only */, exclusive); if (s.ok() && value != nullptr) { - s = Get(read_options, column_family, key, value); + assert(value != nullptr); + PinnableSlice pinnable_val(value); + assert(!pinnable_val.IsPinned()); + s = Get(read_options, column_family, key, &pinnable_val); + if (s.ok() && pinnable_val.IsPinned()) { + value->assign(pinnable_val.data(), pinnable_val.size()); + } // else value is already assigned + } + return s; +} + +Status TransactionBaseImpl::GetForUpdate(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, + PinnableSlice* pinnable_val, + bool exclusive) { + Status s = TryLock(column_family, key, true /* read_only */, exclusive); + + if (s.ok() && pinnable_val != nullptr) { + s = Get(read_options, column_family, key, pinnable_val); } return s; } diff --git a/utilities/transactions/transaction_base.h b/utilities/transactions/transaction_base.h index 1514836489..c73b329f40 100644 --- a/utilities/transactions/transaction_base.h +++ b/utilities/transactions/transaction_base.h @@ -46,18 +46,27 @@ class TransactionBaseImpl : public Transaction { Status RollbackToSavePoint() override; + using Transaction::Get; Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) override; + Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) override; + Status Get(const ReadOptions& options, const Slice& key, std::string* value) override { return Get(options, db_->DefaultColumnFamily(), key, value); } + using Transaction::GetForUpdate; Status GetForUpdate(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool exclusive) override; + Status GetForUpdate(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* pinnable_val, bool exclusive) override; + Status GetForUpdate(const ReadOptions& options, const Slice& key, std::string* value, bool exclusive) override { return GetForUpdate(options, db_->DefaultColumnFamily(), key, value, diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index dc5d0fcf60..b2820109cc 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -385,8 +385,8 @@ class WBWIIteratorImpl : public WBWIIterator { }; struct WriteBatchWithIndex::Rep { - Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, - size_t max_bytes = 0, bool _overwrite_key = false) + explicit Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, + size_t max_bytes = 0, bool _overwrite_key = false) : write_batch(reserved_bytes, max_bytes), comparator(index_comparator, &write_batch), skip_list(comparator, &arena), @@ -743,8 +743,23 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, const ReadOptions& read_options, const Slice& key, std::string* value) { + assert(value != nullptr); + PinnableSlice pinnable_val(value); + assert(!pinnable_val.IsPinned()); + auto s = GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, + &pinnable_val); + if (s.ok() && pinnable_val.IsPinned()) { + value->assign(pinnable_val.data(), pinnable_val.size()); + } // else value is already assigned + return s; +} + +Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, + const ReadOptions& read_options, + const Slice& key, + PinnableSlice* pinnable_val) { return GetFromBatchAndDB(db, read_options, db->DefaultColumnFamily(), key, - value); + pinnable_val); } Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, @@ -752,19 +767,38 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { + assert(value != nullptr); + PinnableSlice pinnable_val(value); + assert(!pinnable_val.IsPinned()); + auto s = + GetFromBatchAndDB(db, read_options, column_family, key, &pinnable_val); + if (s.ok() && pinnable_val.IsPinned()) { + value->assign(pinnable_val.data(), pinnable_val.size()); + } // else value is already assigned + return s; +} + +Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, + const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, + PinnableSlice* pinnable_val) { Status s; MergeContext merge_context; const ImmutableDBOptions& immuable_db_options = reinterpret_cast(db)->immutable_db_options(); - std::string batch_value; + // Since the lifetime of the WriteBatch is the same as that of the transaction + // we cannot pin it as otherwise the returned value will not be available + // after the transaction finishes. + std::string& batch_value = *pinnable_val->GetSelf(); WriteBatchWithIndexInternal::Result result = WriteBatchWithIndexInternal::GetFromBatch( immuable_db_options, this, column_family, key, &merge_context, &rep->comparator, &batch_value, rep->overwrite_key, &s); if (result == WriteBatchWithIndexInternal::Result::kFound) { - value->assign(batch_value.data(), batch_value.size()); + pinnable_val->PinSelf(); return s; } if (result == WriteBatchWithIndexInternal::Result::kDeleted) { @@ -785,7 +819,7 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, result == WriteBatchWithIndexInternal::Result::kNotFound); // Did not find key in batch OR could not resolve Merges. Try DB. - s = db->Get(read_options, column_family, key, value); + s = db->Get(read_options, column_family, key, pinnable_val); if (s.ok() || s.IsNotFound()) { // DB Get Succeeded if (result == WriteBatchWithIndexInternal::Result::kMergeInProgress) { @@ -797,18 +831,18 @@ Status WriteBatchWithIndex::GetFromBatchAndDB(DB* db, Env* env = immuable_db_options.env; Logger* logger = immuable_db_options.info_log.get(); - Slice db_slice(*value); Slice* merge_data; if (s.ok()) { - merge_data = &db_slice; + merge_data = pinnable_val; } else { // Key not present in db (s.IsNotFound()) merge_data = nullptr; } if (merge_operator) { - s = MergeHelper::TimedFullMerge(merge_operator, key, merge_data, - merge_context.GetOperands(), value, - logger, statistics, env); + s = MergeHelper::TimedFullMerge( + merge_operator, key, merge_data, merge_context.GetOperands(), + pinnable_val->GetSelf(), logger, statistics, env); + pinnable_val->PinSelf(); } else { s = Status::InvalidArgument("Options::merge_operator must be set"); }