mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-02 01:16:16 +00:00
06e593376c
Summary: ## Context/Summary Similar to https://github.com/facebook/rocksdb/pull/11288, https://github.com/facebook/rocksdb/pull/11444, categorizing SST/blob file write according to different io activities allows more insight into the activity. For that, this PR does the following: - Tag different write IOs by passing down and converting WriteOptions to IOOptions - Add new SST_WRITE_MICROS histogram in WritableFileWriter::Append() and breakdown FILE_WRITE_{FLUSH|COMPACTION|DB_OPEN}_MICROS Some related code refactory to make implementation cleaner: - Blob stats - Replace high-level write measurement with low-level WritableFileWriter::Append() measurement for BLOB_DB_BLOB_FILE_WRITE_MICROS. This is to make FILE_WRITE_{FLUSH|COMPACTION|DB_OPEN}_MICROS include blob file. As a consequence, this introduces some behavioral changes on it, see HISTORY and db bench test plan below for more info. - Fix bugs where BLOB_DB_BLOB_FILE_SYNCED/BLOB_DB_BLOB_FILE_BYTES_WRITTEN include file failed to sync and bytes failed to write. - Refactor WriteOptions constructor for easier construction with io_activity and rate_limiter_priority - Refactor DBImpl::~DBImpl()/BlobDBImpl::Close() to bypass thread op verification - Build table - TableBuilderOptions now includes Read/WriteOpitons so BuildTable() do not need to take these two variables - Replace the io_priority passed into BuildTable() with TableBuilderOptions::WriteOpitons::rate_limiter_priority. Similar for BlobFileBuilder. This parameter is used for dynamically changing file io priority for flush, see https://github.com/facebook/rocksdb/pull/9988?fbclid=IwAR1DtKel6c-bRJAdesGo0jsbztRtciByNlvokbxkV6h_L-AE9MACzqRTT5s for more - Update ThreadStatus::FLUSH_BYTES_WRITTEN to use io_activity to track flush IO in flush job and db open instead of io_priority ## Test ### db bench Flush ``` ./db_bench --statistics=1 --benchmarks=fillseq --num=100000 --write_buffer_size=100 rocksdb.sst.write.micros P50 : 1.830863 P95 : 4.094720 P99 : 6.578947 P100 : 26.000000 COUNT : 7875 SUM : 20377 rocksdb.file.write.flush.micros P50 : 1.830863 P95 : 4.094720 P99 : 6.578947 P100 : 26.000000 COUNT : 7875 SUM : 20377 rocksdb.file.write.compaction.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0 rocksdb.file.write.db.open.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0 ``` compaction, db oopen ``` Setup: ./db_bench --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench Run:./db_bench --statistics=1 --benchmarks=compact --db=../db_bench --use_existing_db=1 rocksdb.sst.write.micros P50 : 2.675325 P95 : 9.578788 P99 : 18.780000 P100 : 314.000000 COUNT : 638 SUM : 3279 rocksdb.file.write.flush.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0 rocksdb.file.write.compaction.micros P50 : 2.757353 P95 : 9.610687 P99 : 19.316667 P100 : 314.000000 COUNT : 615 SUM : 3213 rocksdb.file.write.db.open.micros P50 : 2.055556 P95 : 3.925000 P99 : 9.000000 P100 : 9.000000 COUNT : 23 SUM : 66 ``` blob stats - just to make sure they aren't broken by this PR ``` Integrated Blob DB Setup: ./db_bench --enable_blob_files=1 --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench Run:./db_bench --enable_blob_files=1 --statistics=1 --benchmarks=compact --db=../db_bench --use_existing_db=1 pre-PR: rocksdb.blobdb.blob.file.write.micros P50 : 7.298246 P95 : 9.771930 P99 : 9.991813 P100 : 16.000000 COUNT : 235 SUM : 1600 rocksdb.blobdb.blob.file.synced COUNT : 1 rocksdb.blobdb.blob.file.bytes.written COUNT : 34842 post-PR: rocksdb.blobdb.blob.file.write.micros P50 : 2.000000 P95 : 2.829360 P99 : 2.993779 P100 : 9.000000 COUNT : 707 SUM : 1614 - COUNT is higher and values are smaller as it includes header and footer write - COUNT is 3X higher due to each Append() count as one post-PR, while in pre-PR, 3 Append()s counts as one. See https://github.com/facebook/rocksdb/pull/11910/files#diff-32b811c0a1c000768cfb2532052b44dc0b3bf82253f3eab078e15ff201a0dabfL157-L164 rocksdb.blobdb.blob.file.synced COUNT : 1 (stay the same) rocksdb.blobdb.blob.file.bytes.written COUNT : 34842 (stay the same) ``` ``` Stacked Blob DB Run: ./db_bench --use_blob_db=1 --statistics=1 --benchmarks=fillseq --num=10000 --disable_auto_compactions=1 -write_buffer_size=100 --db=../db_bench pre-PR: rocksdb.blobdb.blob.file.write.micros P50 : 12.808042 P95 : 19.674497 P99 : 28.539683 P100 : 51.000000 COUNT : 10000 SUM : 140876 rocksdb.blobdb.blob.file.synced COUNT : 8 rocksdb.blobdb.blob.file.bytes.written COUNT : 1043445 post-PR: rocksdb.blobdb.blob.file.write.micros P50 : 1.657370 P95 : 2.952175 P99 : 3.877519 P100 : 24.000000 COUNT : 30001 SUM : 67924 - COUNT is higher and values are smaller as it includes header and footer write - COUNT is 3X higher due to each Append() count as one post-PR, while in pre-PR, 3 Append()s counts as one. See https://github.com/facebook/rocksdb/pull/11910/files#diff-32b811c0a1c000768cfb2532052b44dc0b3bf82253f3eab078e15ff201a0dabfL157-L164 rocksdb.blobdb.blob.file.synced COUNT : 8 (stay the same) rocksdb.blobdb.blob.file.bytes.written COUNT : 1043445 (stay the same) ``` ### Rehearsal CI stress test Trigger 3 full runs of all our CI stress tests ### Performance Flush ``` TEST_TMPDIR=/dev/shm ./db_basic_bench_pre_pr --benchmark_filter=ManualFlush/key_num:524288/per_key_size:256 --benchmark_repetitions=1000 -- default: 1 thread is used to run benchmark; enable_statistics = true Pre-pr: avg 507515519.3 ns 497686074,499444327,500862543,501389862,502994471,503744435,504142123,504224056,505724198,506610393,506837742,506955122,507695561,507929036,508307733,508312691,508999120,509963561,510142147,510698091,510743096,510769317,510957074,511053311,511371367,511409911,511432960,511642385,511691964,511730908, Post-pr: avg 511971266.5 ns, regressed 0.88% 502744835,506502498,507735420,507929724,508313335,509548582,509994942,510107257,510715603,511046955,511352639,511458478,512117521,512317380,512766303,512972652,513059586,513804934,513808980,514059409,514187369,514389494,514447762,514616464,514622882,514641763,514666265,514716377,514990179,515502408, ``` Compaction ``` TEST_TMPDIR=/dev/shm ./db_basic_bench_{pre|post}_pr --benchmark_filter=ManualCompaction/comp_style:0/max_data:134217728/per_key_size:256/enable_statistics:1 --benchmark_repetitions=1000 -- default: 1 thread is used to run benchmark Pre-pr: avg 495346098.30 ns 492118301,493203526,494201411,494336607,495269217,495404950,496402598,497012157,497358370,498153846 Post-pr: avg 504528077.20, regressed 1.85%. "ManualCompaction" include flush so the isolated regression for compaction should be around 1.85-0.88 = 0.97% 502465338,502485945,502541789,502909283,503438601,504143885,506113087,506629423,507160414,507393007 ``` Put with WAL (in case passing WriteOptions slows down this path even without collecting SST write stats) ``` TEST_TMPDIR=/dev/shm ./db_basic_bench_pre_pr --benchmark_filter=DBPut/comp_style:0/max_data:107374182400/per_key_size:256/enable_statistics:1/wal:1 --benchmark_repetitions=1000 -- default: 1 thread is used to run benchmark Pre-pr: avg 3848.10 ns 3814,3838,3839,3848,3854,3854,3854,3860,3860,3860 Post-pr: avg 3874.20 ns, regressed 0.68% 3863,3867,3871,3874,3875,3877,3877,3877,3880,3881 ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/11910 Reviewed By: ajkr Differential Revision: D49788060 Pulled By: hx235 fbshipit-source-id: 79e73699cda5be3b66461687e5147c2484fc5eff
1088 lines
41 KiB
C++
1088 lines
41 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
|
|
#include "utilities/transactions/write_unprepared_txn.h"
|
|
|
|
#include "db/db_impl/db_impl.h"
|
|
#include "util/cast_util.h"
|
|
#include "utilities/transactions/write_unprepared_txn_db.h"
|
|
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
|
|
// Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
|
|
// in unprep_seqs, we have to check if seq is equal to prep_seq or any of
|
|
// the prepare_batch_cnt seq nums after it.
|
|
//
|
|
// TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
|
|
// large.
|
|
for (const auto& it : unprep_seqs_) {
|
|
if (it.first <= seq && seq < it.first + it.second) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
bool snap_released = false;
|
|
auto ret =
|
|
db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_, &snap_released);
|
|
assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
|
|
snap_released_ |= snap_released;
|
|
return ret;
|
|
}
|
|
|
|
WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
|
|
const WriteOptions& write_options,
|
|
const TransactionOptions& txn_options)
|
|
: WritePreparedTxn(txn_db, write_options, txn_options),
|
|
wupt_db_(txn_db),
|
|
last_log_number_(0),
|
|
recovered_txn_(false),
|
|
largest_validated_seq_(0) {
|
|
if (txn_options.write_batch_flush_threshold < 0) {
|
|
write_batch_flush_threshold_ =
|
|
txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
|
|
} else {
|
|
write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
|
|
}
|
|
}
|
|
|
|
WriteUnpreparedTxn::~WriteUnpreparedTxn() {
|
|
if (!unprep_seqs_.empty()) {
|
|
assert(log_number_ > 0);
|
|
assert(GetId() > 0);
|
|
assert(!name_.empty());
|
|
|
|
// We should rollback regardless of GetState, but some unit tests that
|
|
// test crash recovery run the destructor assuming that rollback does not
|
|
// happen, so that rollback during recovery can be exercised.
|
|
if (GetState() == STARTED || GetState() == LOCKS_STOLEN) {
|
|
auto s = RollbackInternal();
|
|
assert(s.ok());
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_FATAL(
|
|
wupt_db_->info_log_,
|
|
"Rollback of WriteUnprepared transaction failed in destructor: %s",
|
|
s.ToString().c_str());
|
|
}
|
|
dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
|
|
log_number_);
|
|
}
|
|
}
|
|
|
|
// Clear the tracked locks so that ~PessimisticTransaction does not
|
|
// try to unlock keys for recovered transactions.
|
|
if (recovered_txn_) {
|
|
tracked_locks_->Clear();
|
|
}
|
|
}
|
|
|
|
void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
|
|
PessimisticTransaction::Initialize(txn_options);
|
|
if (txn_options.write_batch_flush_threshold < 0) {
|
|
write_batch_flush_threshold_ =
|
|
txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
|
|
} else {
|
|
write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
|
|
}
|
|
|
|
unprep_seqs_.clear();
|
|
flushed_save_points_.reset(nullptr);
|
|
unflushed_save_points_.reset(nullptr);
|
|
recovered_txn_ = false;
|
|
largest_validated_seq_ = 0;
|
|
assert(active_iterators_.empty());
|
|
active_iterators_.clear();
|
|
untracked_keys_.clear();
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) {
|
|
Status s;
|
|
if (active_iterators_.empty()) {
|
|
s = MaybeFlushWriteBatchToDB();
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
s = do_write();
|
|
if (s.ok()) {
|
|
if (snapshot_) {
|
|
largest_validated_seq_ =
|
|
std::max(largest_validated_seq_, snapshot_->GetSequenceNumber());
|
|
} else {
|
|
// TODO(lth): We should use the same number as tracked_at_seq in TryLock,
|
|
// because what is actually being tracked is the sequence number at which
|
|
// this key was locked at.
|
|
largest_validated_seq_ = db_impl_->GetLastPublishedSequence();
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
|
|
const Slice& key, const Slice& value,
|
|
const bool assume_tracked) {
|
|
return HandleWrite([&]() {
|
|
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
|
|
});
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
|
|
const SliceParts& key, const SliceParts& value,
|
|
const bool assume_tracked) {
|
|
return HandleWrite([&]() {
|
|
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
|
|
});
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
|
|
const Slice& key, const Slice& value,
|
|
const bool assume_tracked) {
|
|
return HandleWrite([&]() {
|
|
return TransactionBaseImpl::Merge(column_family, key, value,
|
|
assume_tracked);
|
|
});
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
|
|
const Slice& key, const bool assume_tracked) {
|
|
return HandleWrite([&]() {
|
|
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
|
|
});
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
|
|
const SliceParts& key,
|
|
const bool assume_tracked) {
|
|
return HandleWrite([&]() {
|
|
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
|
|
});
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
|
|
const Slice& key,
|
|
const bool assume_tracked) {
|
|
return HandleWrite([&]() {
|
|
return TransactionBaseImpl::SingleDelete(column_family, key,
|
|
assume_tracked);
|
|
});
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
|
|
const SliceParts& key,
|
|
const bool assume_tracked) {
|
|
return HandleWrite([&]() {
|
|
return TransactionBaseImpl::SingleDelete(column_family, key,
|
|
assume_tracked);
|
|
});
|
|
}
|
|
|
|
// WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For
|
|
// WriteUnprepared, the write batches have already been written into the
|
|
// database during WAL replay, so all we have to do is just to "retrack" the key
|
|
// so that rollbacks are possible.
|
|
//
|
|
// Calling TryLock instead of TrackKey is also possible, but as an optimization,
|
|
// recovered transactions do not hold locks on their keys. This follows the
|
|
// implementation in PessimisticTransactionDB::Initialize where we set
|
|
// skip_concurrency_control to true.
|
|
Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) {
|
|
struct TrackKeyHandler : public WriteBatch::Handler {
|
|
WriteUnpreparedTxn* txn_;
|
|
bool rollback_merge_operands_;
|
|
|
|
TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
|
|
: txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
|
|
|
|
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
|
|
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
|
|
false /* read_only */, true /* exclusive */);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status DeleteCF(uint32_t cf, const Slice& key) override {
|
|
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
|
|
false /* read_only */, true /* exclusive */);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
|
|
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
|
|
false /* read_only */, true /* exclusive */);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
|
|
if (rollback_merge_operands_) {
|
|
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
|
|
false /* read_only */, true /* exclusive */);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// Recovered batches do not contain 2PC markers.
|
|
Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
|
|
|
|
Status MarkEndPrepare(const Slice&) override {
|
|
return Status::InvalidArgument();
|
|
}
|
|
|
|
Status MarkNoop(bool) override { return Status::InvalidArgument(); }
|
|
|
|
Status MarkCommit(const Slice&) override {
|
|
return Status::InvalidArgument();
|
|
}
|
|
|
|
Status MarkRollback(const Slice&) override {
|
|
return Status::InvalidArgument();
|
|
}
|
|
};
|
|
|
|
TrackKeyHandler handler(this,
|
|
wupt_db_->txn_db_options_.rollback_merge_operands);
|
|
return wb->Iterate(&handler);
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
|
|
const bool kPrepared = true;
|
|
Status s;
|
|
if (write_batch_flush_threshold_ > 0 &&
|
|
write_batch_.GetWriteBatch()->Count() > 0 &&
|
|
write_batch_.GetDataSize() >
|
|
static_cast<size_t>(write_batch_flush_threshold_)) {
|
|
assert(GetState() != PREPARED);
|
|
s = FlushWriteBatchToDB(!kPrepared);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
|
|
// If the current write batch contains savepoints, then some special handling
|
|
// is required so that RollbackToSavepoint can work.
|
|
//
|
|
// RollbackToSavepoint is not supported after Prepare() is called, so only do
|
|
// this for unprepared batches.
|
|
if (!prepared && unflushed_save_points_ != nullptr &&
|
|
!unflushed_save_points_->empty()) {
|
|
return FlushWriteBatchWithSavePointToDB();
|
|
}
|
|
|
|
return FlushWriteBatchToDBInternal(prepared);
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
|
|
if (name_.empty()) {
|
|
assert(!prepared);
|
|
#ifndef NDEBUG
|
|
static std::atomic_ullong autogen_id{0};
|
|
// To avoid changing all tests to call SetName, just autogenerate one.
|
|
if (wupt_db_->txn_db_options_.autogenerate_name) {
|
|
auto s = SetName(std::string("autoxid") +
|
|
std::to_string(autogen_id.fetch_add(1)));
|
|
assert(s.ok());
|
|
} else
|
|
#endif
|
|
{
|
|
return Status::InvalidArgument("Cannot write to DB without SetName.");
|
|
}
|
|
}
|
|
|
|
struct UntrackedKeyHandler : public WriteBatch::Handler {
|
|
WriteUnpreparedTxn* txn_;
|
|
bool rollback_merge_operands_;
|
|
|
|
UntrackedKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
|
|
: txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
|
|
|
|
Status AddUntrackedKey(uint32_t cf, const Slice& key) {
|
|
auto str = key.ToString();
|
|
PointLockStatus lock_status =
|
|
txn_->tracked_locks_->GetPointLockStatus(cf, str);
|
|
if (!lock_status.locked) {
|
|
txn_->untracked_keys_[cf].push_back(str);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
|
|
return AddUntrackedKey(cf, key);
|
|
}
|
|
|
|
Status DeleteCF(uint32_t cf, const Slice& key) override {
|
|
return AddUntrackedKey(cf, key);
|
|
}
|
|
|
|
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
|
|
return AddUntrackedKey(cf, key);
|
|
}
|
|
|
|
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
|
|
if (rollback_merge_operands_) {
|
|
return AddUntrackedKey(cf, key);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// The only expected 2PC marker is the initial Noop marker.
|
|
Status MarkNoop(bool empty_batch) override {
|
|
return empty_batch ? Status::OK() : Status::InvalidArgument();
|
|
}
|
|
|
|
Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
|
|
|
|
Status MarkEndPrepare(const Slice&) override {
|
|
return Status::InvalidArgument();
|
|
}
|
|
|
|
Status MarkCommit(const Slice&) override {
|
|
return Status::InvalidArgument();
|
|
}
|
|
|
|
Status MarkRollback(const Slice&) override {
|
|
return Status::InvalidArgument();
|
|
}
|
|
};
|
|
|
|
UntrackedKeyHandler handler(
|
|
this, wupt_db_->txn_db_options_.rollback_merge_operands);
|
|
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&handler);
|
|
assert(s.ok());
|
|
|
|
// TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
|
|
WriteOptions write_options = write_options_;
|
|
write_options.disableWAL = false;
|
|
const bool WRITE_AFTER_COMMIT = true;
|
|
const bool first_prepare_batch = log_number_ == 0;
|
|
// MarkEndPrepare will change Noop marker to the appropriate marker.
|
|
s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
|
|
name_, !WRITE_AFTER_COMMIT, !prepared);
|
|
assert(s.ok());
|
|
// For each duplicate key we account for a new sub-batch
|
|
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
|
|
// AddPrepared better to be called in the pre-release callback otherwise there
|
|
// is a non-zero chance of max advancing prepare_seq and readers assume the
|
|
// data as committed.
|
|
// Also having it in the PreReleaseCallback allows in-order addition of
|
|
// prepared entries to PreparedHeap and hence enables an optimization. Refer
|
|
// to SmallestUnCommittedSeq for more details.
|
|
AddPreparedCallback add_prepared_callback(
|
|
wpt_db_, db_impl_, prepare_batch_cnt_,
|
|
db_impl_->immutable_db_options().two_write_queues, first_prepare_batch);
|
|
const bool DISABLE_MEMTABLE = true;
|
|
uint64_t seq_used = kMaxSequenceNumber;
|
|
// log_number_ should refer to the oldest log containing uncommitted data
|
|
// from the current transaction. This means that if log_number_ is set,
|
|
// WriteImpl should not overwrite that value, so set log_used to nullptr if
|
|
// log_number_ is already set.
|
|
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
|
|
/*callback*/ nullptr, &last_log_number_,
|
|
/*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used,
|
|
prepare_batch_cnt_, &add_prepared_callback);
|
|
if (log_number_ == 0) {
|
|
log_number_ = last_log_number_;
|
|
}
|
|
assert(!s.ok() || seq_used != kMaxSequenceNumber);
|
|
auto prepare_seq = seq_used;
|
|
|
|
// Only call SetId if it hasn't been set yet.
|
|
if (GetId() == 0) {
|
|
SetId(prepare_seq);
|
|
}
|
|
// unprep_seqs_ will also contain prepared seqnos since they are treated in
|
|
// the same way in the prepare/commit callbacks. See the comment on the
|
|
// definition of unprep_seqs_.
|
|
unprep_seqs_[prepare_seq] = prepare_batch_cnt_;
|
|
|
|
// Reset transaction state.
|
|
if (!prepared) {
|
|
prepare_batch_cnt_ = 0;
|
|
const bool kClear = true;
|
|
TransactionBaseImpl::InitWriteBatch(kClear);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
|
|
assert(unflushed_save_points_ != nullptr &&
|
|
unflushed_save_points_->size() > 0);
|
|
assert(save_points_ != nullptr && save_points_->size() > 0);
|
|
assert(save_points_->size() >= unflushed_save_points_->size());
|
|
|
|
// Handler class for creating an unprepared batch from a savepoint.
|
|
struct SavePointBatchHandler : public WriteBatch::Handler {
|
|
WriteBatchWithIndex* wb_;
|
|
const std::map<uint32_t, ColumnFamilyHandle*>& handles_;
|
|
|
|
SavePointBatchHandler(
|
|
WriteBatchWithIndex* wb,
|
|
const std::map<uint32_t, ColumnFamilyHandle*>& handles)
|
|
: wb_(wb), handles_(handles) {}
|
|
|
|
Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
|
|
return wb_->Put(handles_.at(cf), key, value);
|
|
}
|
|
|
|
Status DeleteCF(uint32_t cf, const Slice& key) override {
|
|
return wb_->Delete(handles_.at(cf), key);
|
|
}
|
|
|
|
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
|
|
return wb_->SingleDelete(handles_.at(cf), key);
|
|
}
|
|
|
|
Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
|
|
return wb_->Merge(handles_.at(cf), key, value);
|
|
}
|
|
|
|
// The only expected 2PC marker is the initial Noop marker.
|
|
Status MarkNoop(bool empty_batch) override {
|
|
return empty_batch ? Status::OK() : Status::InvalidArgument();
|
|
}
|
|
|
|
Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
|
|
|
|
Status MarkEndPrepare(const Slice&) override {
|
|
return Status::InvalidArgument();
|
|
}
|
|
|
|
Status MarkCommit(const Slice&) override {
|
|
return Status::InvalidArgument();
|
|
}
|
|
|
|
Status MarkRollback(const Slice&) override {
|
|
return Status::InvalidArgument();
|
|
}
|
|
};
|
|
|
|
// The comparator of the default cf is passed in, similar to the
|
|
// initialization of TransactionBaseImpl::write_batch_. This comparator is
|
|
// only used if the write batch encounters an invalid cf id, and falls back to
|
|
// this comparator.
|
|
WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0,
|
|
true, 0, write_options_.protection_bytes_per_key);
|
|
// Swap with write_batch_ so that wb contains the complete write batch. The
|
|
// actual write batch that will be flushed to DB will be built in
|
|
// write_batch_, and will be read by FlushWriteBatchToDBInternal.
|
|
std::swap(wb, write_batch_);
|
|
TransactionBaseImpl::InitWriteBatch();
|
|
|
|
size_t prev_boundary = WriteBatchInternal::kHeader;
|
|
const bool kPrepared = true;
|
|
for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) {
|
|
bool trailing_batch = i == unflushed_save_points_->size();
|
|
SavePointBatchHandler sp_handler(&write_batch_,
|
|
*wupt_db_->GetCFHandleMap().get());
|
|
size_t curr_boundary = trailing_batch ? wb.GetWriteBatch()->GetDataSize()
|
|
: (*unflushed_save_points_)[i];
|
|
|
|
// Construct the partial write batch up to the savepoint.
|
|
//
|
|
// Theoretically, a memcpy between the write batches should be sufficient
|
|
// since the rewriting into the batch should produce the exact same byte
|
|
// representation. Rebuilding the WriteBatchWithIndex index is still
|
|
// necessary though, and would imply doing two passes over the batch though.
|
|
Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler,
|
|
prev_boundary, curr_boundary);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
if (write_batch_.GetWriteBatch()->Count() > 0) {
|
|
// Flush the write batch.
|
|
s = FlushWriteBatchToDBInternal(!kPrepared);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
|
|
if (!trailing_batch) {
|
|
if (flushed_save_points_ == nullptr) {
|
|
flushed_save_points_.reset(
|
|
new autovector<WriteUnpreparedTxn::SavePoint>());
|
|
}
|
|
flushed_save_points_->emplace_back(
|
|
unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot()));
|
|
}
|
|
|
|
prev_boundary = curr_boundary;
|
|
const bool kClear = true;
|
|
TransactionBaseImpl::InitWriteBatch(kClear);
|
|
}
|
|
|
|
unflushed_save_points_->clear();
|
|
return Status::OK();
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::PrepareInternal() {
|
|
const bool kPrepared = true;
|
|
return FlushWriteBatchToDB(kPrepared);
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
|
|
if (unprep_seqs_.empty()) {
|
|
assert(log_number_ == 0);
|
|
assert(GetId() == 0);
|
|
return WritePreparedTxn::CommitWithoutPrepareInternal();
|
|
}
|
|
|
|
// TODO(lth): We should optimize commit without prepare to not perform
|
|
// a prepare under the hood.
|
|
auto s = PrepareInternal();
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
return CommitInternal();
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::CommitInternal() {
|
|
// TODO(lth): Reduce duplicate code with WritePrepared commit logic.
|
|
|
|
// We take the commit-time batch and append the Commit marker. The Memtable
|
|
// will ignore the Commit marker in non-recovery mode
|
|
WriteBatch* working_batch = GetCommitTimeWriteBatch();
|
|
const bool empty = working_batch->Count() == 0;
|
|
auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
|
|
assert(s.ok());
|
|
|
|
const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
|
|
if (!empty) {
|
|
// When not writing to memtable, we can still cache the latest write batch.
|
|
// The cached batch will be written to memtable in WriteRecoverableState
|
|
// during FlushMemTable
|
|
if (for_recovery) {
|
|
WriteBatchInternal::SetAsLatestPersistentState(working_batch);
|
|
} else {
|
|
return Status::InvalidArgument(
|
|
"Commit-time-batch can only be used if "
|
|
"use_only_the_last_commit_time_batch_for_recovery is true");
|
|
}
|
|
}
|
|
|
|
const bool includes_data = !empty && !for_recovery;
|
|
size_t commit_batch_cnt = 0;
|
|
if (UNLIKELY(includes_data)) {
|
|
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
|
|
"Duplicate key overhead");
|
|
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
|
|
s = working_batch->Iterate(&counter);
|
|
assert(s.ok());
|
|
commit_batch_cnt = counter.BatchCount();
|
|
}
|
|
const bool disable_memtable = !includes_data;
|
|
const bool do_one_write =
|
|
!db_impl_->immutable_db_options().two_write_queues || disable_memtable;
|
|
|
|
WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
|
|
wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt);
|
|
const bool kFirstPrepareBatch = true;
|
|
AddPreparedCallback add_prepared_callback(
|
|
wpt_db_, db_impl_, commit_batch_cnt,
|
|
db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
|
|
PreReleaseCallback* pre_release_callback;
|
|
if (do_one_write) {
|
|
pre_release_callback = &update_commit_map;
|
|
} else {
|
|
pre_release_callback = &add_prepared_callback;
|
|
}
|
|
uint64_t seq_used = kMaxSequenceNumber;
|
|
// Since the prepared batch is directly written to memtable, there is
|
|
// already a connection between the memtable and its WAL, so there is no
|
|
// need to redundantly reference the log that contains the prepared data.
|
|
const uint64_t zero_log_number = 0ull;
|
|
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
|
|
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
|
|
zero_log_number, disable_memtable, &seq_used,
|
|
batch_cnt, pre_release_callback);
|
|
assert(!s.ok() || seq_used != kMaxSequenceNumber);
|
|
const SequenceNumber commit_batch_seq = seq_used;
|
|
if (LIKELY(do_one_write || !s.ok())) {
|
|
if (LIKELY(s.ok())) {
|
|
// Note RemovePrepared should be called after WriteImpl that publishsed
|
|
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
|
|
for (const auto& seq : unprep_seqs_) {
|
|
wpt_db_->RemovePrepared(seq.first, seq.second);
|
|
}
|
|
}
|
|
if (UNLIKELY(!do_one_write)) {
|
|
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
|
|
}
|
|
unprep_seqs_.clear();
|
|
flushed_save_points_.reset(nullptr);
|
|
unflushed_save_points_.reset(nullptr);
|
|
return s;
|
|
} // else do the 2nd write to publish seq
|
|
|
|
// Populate unprep_seqs_ with commit_batch_seq, since we treat data in the
|
|
// commit write batch as just another "unprepared" batch. This will also
|
|
// update the unprep_seqs_ in the update_commit_map callback.
|
|
unprep_seqs_[commit_batch_seq] = commit_batch_cnt;
|
|
WriteUnpreparedCommitEntryPreReleaseCallback
|
|
update_commit_map_with_commit_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
|
|
|
|
// Note: the 2nd write comes with a performance penality. So if we have too
|
|
// many of commits accompanied with ComitTimeWriteBatch and yet we cannot
|
|
// enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
|
|
// two_write_queues should be disabled to avoid many additional writes here.
|
|
|
|
// Update commit map only from the 2nd queue
|
|
WriteBatch empty_batch;
|
|
s = empty_batch.PutLogData(Slice());
|
|
assert(s.ok());
|
|
// In the absence of Prepare markers, use Noop as a batch separator
|
|
s = WriteBatchInternal::InsertNoop(&empty_batch);
|
|
assert(s.ok());
|
|
const bool DISABLE_MEMTABLE = true;
|
|
const size_t ONE_BATCH = 1;
|
|
const uint64_t NO_REF_LOG = 0;
|
|
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
|
|
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
|
|
&update_commit_map_with_commit_batch);
|
|
assert(!s.ok() || seq_used != kMaxSequenceNumber);
|
|
// Note RemovePrepared should be called after WriteImpl that publishsed the
|
|
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
|
|
for (const auto& seq : unprep_seqs_) {
|
|
wpt_db_->RemovePrepared(seq.first, seq.second);
|
|
}
|
|
unprep_seqs_.clear();
|
|
flushed_save_points_.reset(nullptr);
|
|
unflushed_save_points_.reset(nullptr);
|
|
return s;
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::WriteRollbackKeys(
|
|
const LockTracker& lock_tracker, WriteBatchWithIndex* rollback_batch,
|
|
ReadCallback* callback, const ReadOptions& roptions) {
|
|
// This assertion can be removed when range lock is supported.
|
|
assert(lock_tracker.IsPointLockSupported());
|
|
const auto& cf_map = *wupt_db_->GetCFHandleMap();
|
|
auto WriteRollbackKey = [&](const std::string& key, uint32_t cfid) {
|
|
const auto& cf_handle = cf_map.at(cfid);
|
|
PinnableSlice pinnable_val;
|
|
bool not_used;
|
|
DBImpl::GetImplOptions get_impl_options;
|
|
get_impl_options.column_family = cf_handle;
|
|
get_impl_options.value = &pinnable_val;
|
|
get_impl_options.value_found = ¬_used;
|
|
get_impl_options.callback = callback;
|
|
auto s = db_impl_->GetImpl(roptions, key, get_impl_options);
|
|
|
|
if (s.ok()) {
|
|
s = rollback_batch->Put(cf_handle, key, pinnable_val);
|
|
assert(s.ok());
|
|
} else if (s.IsNotFound()) {
|
|
if (wupt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) {
|
|
s = rollback_batch->SingleDelete(cf_handle, key);
|
|
} else {
|
|
s = rollback_batch->Delete(cf_handle, key);
|
|
}
|
|
assert(s.ok());
|
|
} else {
|
|
return s;
|
|
}
|
|
|
|
return Status::OK();
|
|
};
|
|
|
|
std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
|
|
lock_tracker.GetColumnFamilyIterator());
|
|
assert(cf_it != nullptr);
|
|
while (cf_it->HasNext()) {
|
|
ColumnFamilyId cf = cf_it->Next();
|
|
std::unique_ptr<LockTracker::KeyIterator> key_it(
|
|
lock_tracker.GetKeyIterator(cf));
|
|
assert(key_it != nullptr);
|
|
while (key_it->HasNext()) {
|
|
const std::string& key = key_it->Next();
|
|
auto s = WriteRollbackKey(key, cf);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
}
|
|
|
|
for (const auto& cfkey : untracked_keys_) {
|
|
const auto cfid = cfkey.first;
|
|
const auto& keys = cfkey.second;
|
|
for (const auto& key : keys) {
|
|
auto s = WriteRollbackKey(key, cfid);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::RollbackInternal() {
|
|
// TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
|
|
WriteBatchWithIndex rollback_batch(
|
|
wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0,
|
|
write_options_.protection_bytes_per_key);
|
|
assert(GetId() != kMaxSequenceNumber);
|
|
assert(GetId() > 0);
|
|
Status s;
|
|
auto read_at_seq = kMaxSequenceNumber;
|
|
// TODO: plumb Env::IOActivity, Env::IOPriority
|
|
ReadOptions roptions;
|
|
// to prevent callback's seq to be overrriden inside DBImpk::Get
|
|
roptions.snapshot = wpt_db_->GetMaxSnapshot();
|
|
// Note that we do not use WriteUnpreparedTxnReadCallback because we do not
|
|
// need to read our own writes when reading prior versions of the key for
|
|
// rollback.
|
|
WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
|
|
// TODO(lth): We write rollback batch all in a single batch here, but this
|
|
// should be subdivded into multiple batches as well. In phase 2, when key
|
|
// sets are read from WAL, this will happen naturally.
|
|
s = WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
// The Rollback marker will be used as a batch separator
|
|
s = WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
|
|
assert(s.ok());
|
|
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
|
|
const bool DISABLE_MEMTABLE = true;
|
|
const uint64_t NO_REF_LOG = 0;
|
|
uint64_t seq_used = kMaxSequenceNumber;
|
|
// Rollback batch may contain duplicate keys, because tracked_keys_ is not
|
|
// comparator aware.
|
|
auto rollback_batch_cnt = rollback_batch.SubBatchCnt();
|
|
// We commit the rolled back prepared batches. Although this is
|
|
// counter-intuitive, i) it is safe to do so, since the prepared batches are
|
|
// already canceled out by the rollback batch, ii) adding the commit entry to
|
|
// CommitCache will allow us to benefit from the existing mechanism in
|
|
// CommitCache that keeps an entry evicted due to max advance and yet overlaps
|
|
// with a live snapshot around so that the live snapshot properly skips the
|
|
// entry even if its prepare seq is lower than max_evicted_seq_.
|
|
//
|
|
// TODO(lth): RollbackInternal is conceptually very similar to
|
|
// CommitInternal, with the rollback batch simply taking on the role of
|
|
// CommitTimeWriteBatch. We should be able to merge the two code paths.
|
|
WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
|
|
wpt_db_, db_impl_, unprep_seqs_, rollback_batch_cnt);
|
|
// Note: the rollback batch does not need AddPrepared since it is written to
|
|
// DB in one shot. min_uncommitted still works since it requires capturing
|
|
// data that is written to DB but not yet committed, while the rollback
|
|
// batch commits with PreReleaseCallback.
|
|
s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
|
|
nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE,
|
|
&seq_used, rollback_batch_cnt,
|
|
do_one_write ? &update_commit_map : nullptr);
|
|
assert(!s.ok() || seq_used != kMaxSequenceNumber);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
if (do_one_write) {
|
|
for (const auto& seq : unprep_seqs_) {
|
|
wpt_db_->RemovePrepared(seq.first, seq.second);
|
|
}
|
|
unprep_seqs_.clear();
|
|
flushed_save_points_.reset(nullptr);
|
|
unflushed_save_points_.reset(nullptr);
|
|
return s;
|
|
} // else do the 2nd write for commit
|
|
|
|
uint64_t& prepare_seq = seq_used;
|
|
// Populate unprep_seqs_ with rollback_batch_cnt, since we treat data in the
|
|
// rollback write batch as just another "unprepared" batch. This will also
|
|
// update the unprep_seqs_ in the update_commit_map callback.
|
|
unprep_seqs_[prepare_seq] = rollback_batch_cnt;
|
|
WriteUnpreparedCommitEntryPreReleaseCallback
|
|
update_commit_map_with_rollback_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
|
|
|
|
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
|
|
"RollbackInternal 2nd write prepare_seq: %" PRIu64,
|
|
prepare_seq);
|
|
WriteBatch empty_batch;
|
|
const size_t ONE_BATCH = 1;
|
|
s = empty_batch.PutLogData(Slice());
|
|
assert(s.ok());
|
|
// In the absence of Prepare markers, use Noop as a batch separator
|
|
s = WriteBatchInternal::InsertNoop(&empty_batch);
|
|
assert(s.ok());
|
|
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
|
|
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
|
|
&update_commit_map_with_rollback_batch);
|
|
assert(!s.ok() || seq_used != kMaxSequenceNumber);
|
|
// Mark the txn as rolled back
|
|
if (s.ok()) {
|
|
for (const auto& seq : unprep_seqs_) {
|
|
wpt_db_->RemovePrepared(seq.first, seq.second);
|
|
}
|
|
}
|
|
|
|
unprep_seqs_.clear();
|
|
flushed_save_points_.reset(nullptr);
|
|
unflushed_save_points_.reset(nullptr);
|
|
return s;
|
|
}
|
|
|
|
void WriteUnpreparedTxn::Clear() {
|
|
if (!recovered_txn_) {
|
|
txn_db_impl_->UnLock(this, *tracked_locks_);
|
|
}
|
|
unprep_seqs_.clear();
|
|
flushed_save_points_.reset(nullptr);
|
|
unflushed_save_points_.reset(nullptr);
|
|
recovered_txn_ = false;
|
|
largest_validated_seq_ = 0;
|
|
for (auto& it : active_iterators_) {
|
|
auto bdit = static_cast<BaseDeltaIterator*>(it);
|
|
bdit->Invalidate(Status::InvalidArgument(
|
|
"Cannot use iterator after transaction has finished"));
|
|
}
|
|
active_iterators_.clear();
|
|
untracked_keys_.clear();
|
|
TransactionBaseImpl::Clear();
|
|
}
|
|
|
|
void WriteUnpreparedTxn::SetSavePoint() {
|
|
assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
|
|
(flushed_save_points_ ? flushed_save_points_->size() : 0) ==
|
|
(save_points_ ? save_points_->size() : 0));
|
|
PessimisticTransaction::SetSavePoint();
|
|
if (unflushed_save_points_ == nullptr) {
|
|
unflushed_save_points_.reset(new autovector<size_t>());
|
|
}
|
|
unflushed_save_points_->push_back(write_batch_.GetDataSize());
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::RollbackToSavePoint() {
|
|
assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
|
|
(flushed_save_points_ ? flushed_save_points_->size() : 0) ==
|
|
(save_points_ ? save_points_->size() : 0));
|
|
if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
|
|
Status s = PessimisticTransaction::RollbackToSavePoint();
|
|
assert(!s.IsNotFound());
|
|
unflushed_save_points_->pop_back();
|
|
return s;
|
|
}
|
|
|
|
if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
|
|
return RollbackToSavePointInternal();
|
|
}
|
|
|
|
return Status::NotFound();
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
|
|
Status s;
|
|
|
|
const bool kClear = true;
|
|
TransactionBaseImpl::InitWriteBatch(kClear);
|
|
|
|
assert(flushed_save_points_->size() > 0);
|
|
WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back();
|
|
|
|
assert(save_points_ != nullptr && save_points_->size() > 0);
|
|
const LockTracker& tracked_keys = *save_points_->top().new_locks_;
|
|
|
|
// TODO: plumb Env::IOActivity, Env::IOPriority
|
|
ReadOptions roptions;
|
|
roptions.snapshot = top.snapshot_->snapshot();
|
|
SequenceNumber min_uncommitted =
|
|
static_cast_with_check<const SnapshotImpl>(roptions.snapshot)
|
|
->min_uncommitted_;
|
|
SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber();
|
|
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
|
|
top.unprep_seqs_,
|
|
kBackedByDBSnapshot);
|
|
s = WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
const bool kPrepared = true;
|
|
s = FlushWriteBatchToDBInternal(!kPrepared);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
// PessimisticTransaction::RollbackToSavePoint will call also call
|
|
// RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has
|
|
// no savepoints because this savepoint has already been flushed. Work around
|
|
// this by setting a fake savepoint.
|
|
write_batch_.SetSavePoint();
|
|
s = PessimisticTransaction::RollbackToSavePoint();
|
|
assert(s.ok());
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
flushed_save_points_->pop_back();
|
|
return s;
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::PopSavePoint() {
|
|
assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
|
|
(flushed_save_points_ ? flushed_save_points_->size() : 0) ==
|
|
(save_points_ ? save_points_->size() : 0));
|
|
if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
|
|
Status s = PessimisticTransaction::PopSavePoint();
|
|
assert(!s.IsNotFound());
|
|
unflushed_save_points_->pop_back();
|
|
return s;
|
|
}
|
|
|
|
if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
|
|
// PessimisticTransaction::PopSavePoint will call also call PopSavePoint on
|
|
// write_batch_. However, write_batch_ is empty and has no savepoints
|
|
// because this savepoint has already been flushed. Work around this by
|
|
// setting a fake savepoint.
|
|
write_batch_.SetSavePoint();
|
|
Status s = PessimisticTransaction::PopSavePoint();
|
|
assert(!s.IsNotFound());
|
|
flushed_save_points_->pop_back();
|
|
return s;
|
|
}
|
|
|
|
return Status::NotFound();
|
|
}
|
|
|
|
void WriteUnpreparedTxn::MultiGet(const ReadOptions& _read_options,
|
|
ColumnFamilyHandle* column_family,
|
|
const size_t num_keys, const Slice* keys,
|
|
PinnableSlice* values, Status* statuses,
|
|
const bool sorted_input) {
|
|
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
|
|
_read_options.io_activity != Env::IOActivity::kMultiGet) {
|
|
Status s = Status::InvalidArgument(
|
|
"Can only call MultiGet with `ReadOptions::io_activity` is "
|
|
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
|
|
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
if (statuses[i].ok()) {
|
|
statuses[i] = s;
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
ReadOptions read_options(_read_options);
|
|
if (read_options.io_activity == Env::IOActivity::kUnknown) {
|
|
read_options.io_activity = Env::IOActivity::kMultiGet;
|
|
}
|
|
SequenceNumber min_uncommitted, snap_seq;
|
|
const SnapshotBackup backed_by_snapshot = wupt_db_->AssignMinMaxSeqs(
|
|
read_options.snapshot, &min_uncommitted, &snap_seq);
|
|
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
|
|
unprep_seqs_, backed_by_snapshot);
|
|
write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
|
|
num_keys, keys, values, statuses,
|
|
sorted_input, &callback);
|
|
if (UNLIKELY(!callback.valid() ||
|
|
!wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
|
|
wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
|
|
for (size_t i = 0; i < num_keys; i++) {
|
|
statuses[i] = Status::TryAgain();
|
|
}
|
|
}
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::Get(const ReadOptions& _read_options,
|
|
ColumnFamilyHandle* column_family,
|
|
const Slice& key, PinnableSlice* value) {
|
|
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
|
|
_read_options.io_activity != Env::IOActivity::kGet) {
|
|
return Status::InvalidArgument(
|
|
"Can only call Get with `ReadOptions::io_activity` is "
|
|
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
|
|
}
|
|
ReadOptions read_options(_read_options);
|
|
if (read_options.io_activity == Env::IOActivity::kUnknown) {
|
|
read_options.io_activity = Env::IOActivity::kGet;
|
|
}
|
|
|
|
return GetImpl(read_options, column_family, key, value);
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::GetImpl(const ReadOptions& options,
|
|
ColumnFamilyHandle* column_family,
|
|
const Slice& key, PinnableSlice* value) {
|
|
SequenceNumber min_uncommitted, snap_seq;
|
|
const SnapshotBackup backed_by_snapshot =
|
|
wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
|
|
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
|
|
unprep_seqs_, backed_by_snapshot);
|
|
auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
|
|
value, &callback);
|
|
if (LIKELY(callback.valid() &&
|
|
wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
|
|
return res;
|
|
} else {
|
|
res.PermitUncheckedError();
|
|
wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
|
|
return Status::TryAgain();
|
|
}
|
|
}
|
|
|
|
namespace {
|
|
static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) {
|
|
auto txn = reinterpret_cast<WriteUnpreparedTxn*>(arg1);
|
|
auto iter = reinterpret_cast<Iterator*>(arg2);
|
|
txn->RemoveActiveIterator(iter);
|
|
}
|
|
} // anonymous namespace
|
|
|
|
Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
|
|
return GetIterator(options, wupt_db_->DefaultColumnFamily());
|
|
}
|
|
|
|
Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
|
|
ColumnFamilyHandle* column_family) {
|
|
// Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
|
|
Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
|
|
assert(db_iter);
|
|
|
|
auto iter =
|
|
write_batch_.NewIteratorWithBase(column_family, db_iter, &options);
|
|
active_iterators_.push_back(iter);
|
|
iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
|
|
return iter;
|
|
}
|
|
|
|
Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
|
|
const Slice& key,
|
|
SequenceNumber* tracked_at_seq) {
|
|
// TODO(lth): Reduce duplicate code with WritePrepared ValidateSnapshot logic.
|
|
assert(snapshot_);
|
|
|
|
SequenceNumber min_uncommitted =
|
|
static_cast_with_check<const SnapshotImpl>(snapshot_.get())
|
|
->min_uncommitted_;
|
|
SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
|
|
// tracked_at_seq is either max or the last snapshot with which this key was
|
|
// trackeed so there is no need to apply the IsInSnapshot to this comparison
|
|
// here as tracked_at_seq is not a prepare seq.
|
|
if (*tracked_at_seq <= snap_seq) {
|
|
// If the key has been previous validated at a sequence number earlier
|
|
// than the curent snapshot's sequence number, we already know it has not
|
|
// been modified.
|
|
return Status::OK();
|
|
}
|
|
|
|
*tracked_at_seq = snap_seq;
|
|
|
|
ColumnFamilyHandle* cfh =
|
|
column_family ? column_family : db_impl_->DefaultColumnFamily();
|
|
|
|
WriteUnpreparedTxnReadCallback snap_checker(
|
|
wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot);
|
|
// TODO(yanqin): Support user-defined timestamp.
|
|
return TransactionUtil::CheckKeyForConflicts(
|
|
db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
|
|
false /* cache_only */, &snap_checker, min_uncommitted);
|
|
}
|
|
|
|
const std::map<SequenceNumber, size_t>&
|
|
WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
|
|
return unprep_seqs_;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|