rocksdb/utilities/transactions/write_unprepared_txn.cc
Jay Huh b99baa5ae7 Do not add unprep_seqs when WriteImpl() fails in unprepared txn (#12927)
Summary:
With the following scenario, we see assertion failure in write_unprepared_txn

1. The write is the first one in the transaction (`log_number_` is not set yet - https://github.com/facebook/rocksdb/blob/main/utilities/transactions/write_unprepared_txn.cc#L376-L379)
2. `WriteToWAL()` fails inside `db_impl_->WriteImpl()` due to fault injection. `last_log_number_`is 0. https://github.com/facebook/rocksdb/blob/main/utilities/transactions/write_unprepared_txn.cc#L386
3. `prepare_batch_cnt_` is still added to `unprep_seqs_` https://github.com/facebook/rocksdb/blob/main/utilities/transactions/write_unprepared_txn.cc#L395-L398
4. When the transaction gets destructed after failed, it expects `log_number_ > 0` if `unprep_seqs_` is not empty - https://github.com/facebook/rocksdb/blob/main/utilities/transactions/write_unprepared_txn.cc#L54-L55.

Step 3 is wrong. `unprep_seqs_` is for the ordered list of unprep sequence numbers that we have already written to the DB. If `db_impl_->WriteImpl()` failed, `unprep_seqs_` shouldn't be set. `prepare_seq` value would be wrong anyway.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12927

Test Plan:
Following command steadily repros the issue. This no longer fails after the fix
```
./db_stress \
  --WAL_size_limit_MB=0 \
  --WAL_ttl_seconds=60 \
  --acquire_snapshot_one_in=10000 \
  --adaptive_readahead=1 \
  --adm_policy=2 \
  --advise_random_on_open=0 \
  --allow_data_in_errors=True \
  --allow_fallocate=1 \
  --async_io=1 \
  --auto_readahead_size=0 \
  --avoid_flush_during_recovery=0 \
  --avoid_flush_during_shutdown=1 \
  --avoid_unnecessary_blocking_io=0 \
  --backup_max_size=104857600 \
  --backup_one_in=100000 \
  --batch_protection_bytes_per_key=8 \
  --bgerror_resume_retry_interval=100 \
  --block_align=1 \
  --block_protection_bytes_per_key=8 \
  --block_size=16384 \
  --bloom_before_level=7 \
  --bloom_bits=4.1280979878467345 \
  --bottommost_compression_type=none \
  --bottommost_file_compaction_delay=600 \
  --bytes_per_sync=262144 \
  --cache_index_and_filter_blocks=1 \
  --cache_index_and_filter_blocks_with_high_priority=0 \
  --cache_size=33554432 \
  --cache_type=lru_cache \
  --charge_compression_dictionary_building_buffer=1 \
  --charge_file_metadata=1 \
  --charge_filter_construction=1 \
  --charge_table_reader=1 \
  --check_multiget_consistency=0 \
  --check_multiget_entity_consistency=1 \
  --checkpoint_one_in=0 \
  --checksum_type=kXXH3 \
  --clear_column_family_one_in=0 \
  --compact_files_one_in=1000000 \
  --compact_range_one_in=1000000 \
  --compaction_pri=4 \
  --compaction_readahead_size=1048576 \
  --compaction_ttl=0 \
  --compress_format_version=1 \
  --compressed_secondary_cache_ratio=0.0 \
  --compressed_secondary_cache_size=0 \
  --compression_checksum=1 \
  --compression_max_dict_buffer_bytes=4294967295 \
  --compression_max_dict_bytes=16384 \
  --compression_parallel_threads=8 \
  --compression_type=none \
  --compression_use_zstd_dict_trainer=1 \
  --compression_zstd_max_train_bytes=0 \
  --continuous_verification_interval=0 \
  --create_timestamped_snapshot_one_in=0 \
  --daily_offpeak_time_utc=04:00-08:00 \
  --data_block_index_type=0 \
  --db=$db \
  --db_write_buffer_size=8388608 \
  --default_temperature=kHot \
  --default_write_temperature=kUnknown \
  --delete_obsolete_files_period_micros=21600000000 \
  --delpercent=5 \
  --delrangepercent=0 \
  --destroy_db_initially=0 \
  --detect_filter_construct_corruption=0 \
  --disable_file_deletions_one_in=10000 \
  --disable_manual_compaction_one_in=10000 \
  --disable_wal=0 \
  --dump_malloc_stats=1 \
  --enable_checksum_handoff=1 \
  --enable_compaction_filter=0 \
  --enable_custom_split_merge=1 \
  --enable_do_not_compress_roles=0 \
  --enable_index_compression=1 \
  --enable_memtable_insert_with_hint_prefix_extractor=0 \
  --enable_pipelined_write=0 \
  --enable_sst_partitioner_factory=0 \
  --enable_thread_tracking=1 \
  --enable_write_thread_adaptive_yield=1 \
  --error_recovery_with_no_fault_injection=1 \
  --exclude_wal_from_write_fault_injection=0 \
  --expected_values_dir=$exp \
  --fail_if_options_file_error=0 \
  --fifo_allow_compaction=1 \
  --file_checksum_impl=crc32c \
  --fill_cache=1 \
  --flush_one_in=1000 \
  --format_version=2 \
  --get_all_column_family_metadata_one_in=1000000 \
  --get_current_wal_file_one_in=0 \
  --get_live_files_apis_one_in=1000000 \
  --get_properties_of_all_tables_one_in=100000 \
  --get_property_one_in=1000000 \
  --get_sorted_wal_files_one_in=0 \
  --hard_pending_compaction_bytes_limit=274877906944 \
  --high_pri_pool_ratio=0 \
  --index_block_restart_interval=15 \
  --index_shortening=0 \
  --index_type=2 \
  --ingest_external_file_one_in=0 \
  --initial_auto_readahead_size=524288 \
  --inplace_update_support=0 \
  --iterpercent=10 \
  --key_len_percent_dist=1,30,69 \
  --key_may_exist_one_in=100000 \
  --kill_random_test=888887 \
  --last_level_temperature=kCold \
  --level_compaction_dynamic_level_bytes=1 \
  --lock_wal_one_in=1000000 \
  --log2_keys_per_lock=10 \
  --log_file_time_to_roll=0 \
  --log_readahead_size=0 \
  --long_running_snapshots=1 \
  --low_pri_pool_ratio=0 \
  --lowest_used_cache_tier=1 \
  --manifest_preallocation_size=5120 \
  --manual_wal_flush_one_in=0 \
  --mark_for_compaction_one_file_in=10 \
  --max_auto_readahead_size=0 \
  --max_background_compactions=20 \
  --max_bytes_for_level_base=10485760 \
  --max_key=100000 \
  --max_key_len=3 \
  --max_log_file_size=1048576 \
  --max_manifest_file_size=1073741824 \
  --max_sequential_skip_in_iterations=2 \
  --max_total_wal_size=0 \
  --max_write_batch_group_size_bytes=16 \
  --max_write_buffer_number=10 \
  --max_write_buffer_size_to_maintain=0 \
  --memtable_insert_hint_per_batch=0 \
  --memtable_max_range_deletions=1000 \
  --memtable_prefix_bloom_size_ratio=0.1 \
  --memtable_protection_bytes_per_key=8 \
  --memtable_whole_key_filtering=1 \
  --memtablerep=skip_list \
  --metadata_charge_policy=0 \
  --metadata_read_fault_one_in=32 \
  --metadata_write_fault_one_in=0 \
  --min_write_buffer_number_to_merge=2 \
  --mmap_read=1 \
  --mock_direct_io=False \
  --nooverwritepercent=1 \
  --num_file_reads_for_auto_readahead=2 \
  --open_files=-1 \
  --open_metadata_read_fault_one_in=0 \
  --open_metadata_write_fault_one_in=0 \
  --open_read_fault_one_in=0 \
  --open_write_fault_one_in=0 \
  --ops_per_thread=20000000 \
  --optimize_filters_for_hits=1 \
  --optimize_filters_for_memory=0 \
  --optimize_multiget_for_io=0 \
  --paranoid_file_checks=1 \
  --partition_filters=0 \
  --partition_pinning=0 \
  --pause_background_one_in=1000000 \
  --periodic_compaction_seconds=1 \
  --prefix_size=5 \
  --prefixpercent=5 \
  --prepopulate_block_cache=0 \
  --preserve_internal_time_seconds=60 \
  --progress_reports=0 \
  --promote_l0_one_in=0 \
  --read_amp_bytes_per_bit=32 \
  --read_fault_one_in=1000 \
  --readahead_size=0 \
  --readpercent=45 \
  --recycle_log_file_num=1 \
  --reopen=20 \
  --report_bg_io_stats=1 \
  --reset_stats_one_in=10000 \
  --sample_for_compression=0 \
  --secondary_cache_fault_one_in=0 \
  --sync=0 \
  --sync_fault_injection=0 \
  --table_cache_numshardbits=6 \
  --target_file_size_base=524288 \
  --target_file_size_multiplier=2 \
  --test_batches_snapshots=0 \
  --top_level_index_pinning=0 \
  --txn_write_policy=2 \
  --uncache_aggressiveness=126 \
  --universal_max_read_amp=4 \
  --unordered_write=0 \
  --unpartitioned_pinning=1 \
  --use_adaptive_mutex=0 \
  --use_adaptive_mutex_lru=1 \
  --use_attribute_group=1 \
  --use_delta_encoding=1 \
  --use_direct_io_for_flush_and_compaction=0 \
  --use_direct_reads=0 \
  --use_full_merge_v1=0 \
  --use_get_entity=0 \
  --use_merge=1 \
  --use_multi_cf_iterator=0 \
  --use_multi_get_entity=0 \
  --use_multiget=1 \
  --use_optimistic_txn=0 \
  --use_put_entity_one_in=0 \
  --use_timed_put_one_in=0 \
  --use_txn=1 \
  --use_write_buffer_manager=0 \
  --user_timestamp_size=0 \
  --value_size_mult=32 \
  --verification_only=0 \
  --verify_checksum=1 \
  --verify_checksum_one_in=1000000 \
  --verify_compression=0 \
  --verify_db_one_in=10000 \
  --verify_file_checksums_one_in=1000000 \
  --verify_iterator_with_expected_state_one_in=5 \
  --verify_sst_unique_id_in_manifest=1 \
  --wal_bytes_per_sync=0 \
  --wal_compression=none \
  --write_buffer_size=4194304 \
  --write_dbid_to_manifest=1 \
  --write_fault_one_in=128 \
  --writepercent=35
```

Reviewed By: cbi42

Differential Revision: D61048774

Pulled By: jaykorean

fbshipit-source-id: 22200d55fd0b22b68732b12516e681a6c6e2c601
2024-08-15 09:16:29 -07:00

1092 lines
41 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/transactions/write_unprepared_txn.h"
#include "db/db_impl/db_impl.h"
#include "util/cast_util.h"
#include "utilities/transactions/write_unprepared_txn_db.h"
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
namespace ROCKSDB_NAMESPACE {
bool WriteUnpreparedTxnReadCallback::IsVisibleFullCheck(SequenceNumber seq) {
// Since unprep_seqs maps prep_seq => prepare_batch_cnt, to check if seq is
// in unprep_seqs, we have to check if seq is equal to prep_seq or any of
// the prepare_batch_cnt seq nums after it.
//
// TODO(lth): Can be optimized with std::lower_bound if unprep_seqs is
// large.
for (const auto& it : unprep_seqs_) {
if (it.first <= seq && seq < it.first + it.second) {
return true;
}
}
bool snap_released = false;
auto ret =
db_->IsInSnapshot(seq, wup_snapshot_, min_uncommitted_, &snap_released);
assert(!snap_released || backed_by_snapshot_ == kUnbackedByDBSnapshot);
snap_released_ |= snap_released;
return ret;
}
WriteUnpreparedTxn::WriteUnpreparedTxn(WriteUnpreparedTxnDB* txn_db,
const WriteOptions& write_options,
const TransactionOptions& txn_options)
: WritePreparedTxn(txn_db, write_options, txn_options),
wupt_db_(txn_db),
last_log_number_(0),
recovered_txn_(false),
largest_validated_seq_(0) {
if (txn_options.write_batch_flush_threshold < 0) {
write_batch_flush_threshold_ =
txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
} else {
write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
}
}
WriteUnpreparedTxn::~WriteUnpreparedTxn() {
if (!unprep_seqs_.empty()) {
assert(log_number_ > 0);
assert(GetId() > 0);
assert(!name_.empty());
// We should rollback regardless of GetState, but some unit tests that
// test crash recovery run the destructor assuming that rollback does not
// happen, so that rollback during recovery can be exercised.
if (GetState() == STARTED || GetState() == LOCKS_STOLEN) {
auto s = RollbackInternal();
assert(s.ok());
if (!s.ok()) {
ROCKS_LOG_FATAL(
wupt_db_->info_log_,
"Rollback of WriteUnprepared transaction failed in destructor: %s",
s.ToString().c_str());
}
dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
log_number_);
}
}
// Clear the tracked locks so that ~PessimisticTransaction does not
// try to unlock keys for recovered transactions.
if (recovered_txn_) {
tracked_locks_->Clear();
}
}
void WriteUnpreparedTxn::Initialize(const TransactionOptions& txn_options) {
PessimisticTransaction::Initialize(txn_options);
if (txn_options.write_batch_flush_threshold < 0) {
write_batch_flush_threshold_ =
txn_db_impl_->GetTxnDBOptions().default_write_batch_flush_threshold;
} else {
write_batch_flush_threshold_ = txn_options.write_batch_flush_threshold;
}
unprep_seqs_.clear();
flushed_save_points_.reset(nullptr);
unflushed_save_points_.reset(nullptr);
recovered_txn_ = false;
largest_validated_seq_ = 0;
assert(active_iterators_.empty());
active_iterators_.clear();
untracked_keys_.clear();
}
Status WriteUnpreparedTxn::HandleWrite(std::function<Status()> do_write) {
Status s;
if (active_iterators_.empty()) {
s = MaybeFlushWriteBatchToDB();
if (!s.ok()) {
return s;
}
}
s = do_write();
if (s.ok()) {
if (snapshot_) {
largest_validated_seq_ =
std::max(largest_validated_seq_, snapshot_->GetSequenceNumber());
} else {
// TODO(lth): We should use the same number as tracked_at_seq in TryLock,
// because what is actually being tracked is the sequence number at which
// this key was locked at.
largest_validated_seq_ = db_impl_->GetLastPublishedSequence();
}
}
return s;
}
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
const bool assume_tracked) {
return HandleWrite([&]() {
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
});
}
Status WriteUnpreparedTxn::Put(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value,
const bool assume_tracked) {
return HandleWrite([&]() {
return TransactionBaseImpl::Put(column_family, key, value, assume_tracked);
});
}
Status WriteUnpreparedTxn::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value,
const bool assume_tracked) {
return HandleWrite([&]() {
return TransactionBaseImpl::Merge(column_family, key, value,
assume_tracked);
});
}
Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
const Slice& key, const bool assume_tracked) {
return HandleWrite([&]() {
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
});
}
Status WriteUnpreparedTxn::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked) {
return HandleWrite([&]() {
return TransactionBaseImpl::Delete(column_family, key, assume_tracked);
});
}
Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key,
const bool assume_tracked) {
return HandleWrite([&]() {
return TransactionBaseImpl::SingleDelete(column_family, key,
assume_tracked);
});
}
Status WriteUnpreparedTxn::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key,
const bool assume_tracked) {
return HandleWrite([&]() {
return TransactionBaseImpl::SingleDelete(column_family, key,
assume_tracked);
});
}
// WriteUnpreparedTxn::RebuildFromWriteBatch is only called on recovery. For
// WriteUnprepared, the write batches have already been written into the
// database during WAL replay, so all we have to do is just to "retrack" the key
// so that rollbacks are possible.
//
// Calling TryLock instead of TrackKey is also possible, but as an optimization,
// recovered transactions do not hold locks on their keys. This follows the
// implementation in PessimisticTransactionDB::Initialize where we set
// skip_concurrency_control to true.
Status WriteUnpreparedTxn::RebuildFromWriteBatch(WriteBatch* wb) {
struct TrackKeyHandler : public WriteBatch::Handler {
WriteUnpreparedTxn* txn_;
bool rollback_merge_operands_;
TrackKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
: txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
false /* read_only */, true /* exclusive */);
return Status::OK();
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
false /* read_only */, true /* exclusive */);
return Status::OK();
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
false /* read_only */, true /* exclusive */);
return Status::OK();
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
if (rollback_merge_operands_) {
txn_->TrackKey(cf, key.ToString(), kMaxSequenceNumber,
false /* read_only */, true /* exclusive */);
}
return Status::OK();
}
// Recovered batches do not contain 2PC markers.
Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
Status MarkEndPrepare(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkNoop(bool) override { return Status::InvalidArgument(); }
Status MarkCommit(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
};
TrackKeyHandler handler(this,
wupt_db_->txn_db_options_.rollback_merge_operands);
return wb->Iterate(&handler);
}
Status WriteUnpreparedTxn::MaybeFlushWriteBatchToDB() {
const bool kPrepared = true;
Status s;
if (write_batch_flush_threshold_ > 0 &&
write_batch_.GetWriteBatch()->Count() > 0 &&
write_batch_.GetDataSize() >
static_cast<size_t>(write_batch_flush_threshold_)) {
assert(GetState() != PREPARED);
s = FlushWriteBatchToDB(!kPrepared);
}
return s;
}
Status WriteUnpreparedTxn::FlushWriteBatchToDB(bool prepared) {
// If the current write batch contains savepoints, then some special handling
// is required so that RollbackToSavepoint can work.
//
// RollbackToSavepoint is not supported after Prepare() is called, so only do
// this for unprepared batches.
if (!prepared && unflushed_save_points_ != nullptr &&
!unflushed_save_points_->empty()) {
return FlushWriteBatchWithSavePointToDB();
}
return FlushWriteBatchToDBInternal(prepared);
}
Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
if (name_.empty()) {
assert(!prepared);
#ifndef NDEBUG
static std::atomic_ullong autogen_id{0};
// To avoid changing all tests to call SetName, just autogenerate one.
if (wupt_db_->txn_db_options_.autogenerate_name) {
auto s = SetName(std::string("autoxid") +
std::to_string(autogen_id.fetch_add(1)));
assert(s.ok());
} else
#endif
{
return Status::InvalidArgument("Cannot write to DB without SetName.");
}
}
struct UntrackedKeyHandler : public WriteBatch::Handler {
WriteUnpreparedTxn* txn_;
bool rollback_merge_operands_;
UntrackedKeyHandler(WriteUnpreparedTxn* txn, bool rollback_merge_operands)
: txn_(txn), rollback_merge_operands_(rollback_merge_operands) {}
Status AddUntrackedKey(uint32_t cf, const Slice& key) {
auto str = key.ToString();
PointLockStatus lock_status =
txn_->tracked_locks_->GetPointLockStatus(cf, str);
if (!lock_status.locked) {
txn_->untracked_keys_[cf].push_back(str);
}
return Status::OK();
}
Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
return AddUntrackedKey(cf, key);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return AddUntrackedKey(cf, key);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return AddUntrackedKey(cf, key);
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
if (rollback_merge_operands_) {
return AddUntrackedKey(cf, key);
}
return Status::OK();
}
// The only expected 2PC marker is the initial Noop marker.
Status MarkNoop(bool empty_batch) override {
return empty_batch ? Status::OK() : Status::InvalidArgument();
}
Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
Status MarkEndPrepare(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkCommit(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
};
UntrackedKeyHandler handler(
this, wupt_db_->txn_db_options_.rollback_merge_operands);
auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&handler);
assert(s.ok());
// TODO(lth): Reduce duplicate code with WritePrepared prepare logic.
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
const bool WRITE_AFTER_COMMIT = true;
const bool first_prepare_batch = log_number_ == 0;
// MarkEndPrepare will change Noop marker to the appropriate marker.
s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
name_, !WRITE_AFTER_COMMIT, !prepared);
assert(s.ok());
// For each duplicate key we account for a new sub-batch
prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
// AddPrepared better to be called in the pre-release callback otherwise there
// is a non-zero chance of max advancing prepare_seq and readers assume the
// data as committed.
// Also having it in the PreReleaseCallback allows in-order addition of
// prepared entries to PreparedHeap and hence enables an optimization. Refer
// to SmallestUnCommittedSeq for more details.
AddPreparedCallback add_prepared_callback(
wpt_db_, db_impl_, prepare_batch_cnt_,
db_impl_->immutable_db_options().two_write_queues, first_prepare_batch);
const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber;
// log_number_ should refer to the oldest log containing uncommitted data
// from the current transaction. This means that if log_number_ is set,
// WriteImpl should not overwrite that value, so set log_used to nullptr if
// log_number_ is already set.
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, /*user_write_cb=*/nullptr,
&last_log_number_,
/*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used,
prepare_batch_cnt_, &add_prepared_callback);
if (log_number_ == 0) {
log_number_ = last_log_number_;
}
assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used;
// Only call SetId if it hasn't been set yet.
if (GetId() == 0) {
SetId(prepare_seq);
}
// unprep_seqs_ will also contain prepared seqnos since they are treated in
// the same way in the prepare/commit callbacks. See the comment on the
// definition of unprep_seqs_.
if (s.ok()) {
unprep_seqs_[prepare_seq] = prepare_batch_cnt_;
}
// Reset transaction state.
if (!prepared) {
prepare_batch_cnt_ = 0;
const bool kClear = true;
TransactionBaseImpl::InitWriteBatch(kClear);
}
return s;
}
Status WriteUnpreparedTxn::FlushWriteBatchWithSavePointToDB() {
assert(unflushed_save_points_ != nullptr &&
unflushed_save_points_->size() > 0);
assert(save_points_ != nullptr && save_points_->size() > 0);
assert(save_points_->size() >= unflushed_save_points_->size());
// Handler class for creating an unprepared batch from a savepoint.
struct SavePointBatchHandler : public WriteBatch::Handler {
WriteBatchWithIndex* wb_;
const std::map<uint32_t, ColumnFamilyHandle*>& handles_;
SavePointBatchHandler(
WriteBatchWithIndex* wb,
const std::map<uint32_t, ColumnFamilyHandle*>& handles)
: wb_(wb), handles_(handles) {}
Status PutCF(uint32_t cf, const Slice& key, const Slice& value) override {
return wb_->Put(handles_.at(cf), key, value);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return wb_->Delete(handles_.at(cf), key);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return wb_->SingleDelete(handles_.at(cf), key);
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice& value) override {
return wb_->Merge(handles_.at(cf), key, value);
}
// The only expected 2PC marker is the initial Noop marker.
Status MarkNoop(bool empty_batch) override {
return empty_batch ? Status::OK() : Status::InvalidArgument();
}
Status MarkBeginPrepare(bool) override { return Status::InvalidArgument(); }
Status MarkEndPrepare(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkCommit(const Slice&) override {
return Status::InvalidArgument();
}
Status MarkRollback(const Slice&) override {
return Status::InvalidArgument();
}
};
// The comparator of the default cf is passed in, similar to the
// initialization of TransactionBaseImpl::write_batch_. This comparator is
// only used if the write batch encounters an invalid cf id, and falls back to
// this comparator.
WriteBatchWithIndex wb(wpt_db_->DefaultColumnFamily()->GetComparator(), 0,
true, 0, write_options_.protection_bytes_per_key);
// Swap with write_batch_ so that wb contains the complete write batch. The
// actual write batch that will be flushed to DB will be built in
// write_batch_, and will be read by FlushWriteBatchToDBInternal.
std::swap(wb, write_batch_);
TransactionBaseImpl::InitWriteBatch();
size_t prev_boundary = WriteBatchInternal::kHeader;
const bool kPrepared = true;
for (size_t i = 0; i < unflushed_save_points_->size() + 1; i++) {
bool trailing_batch = i == unflushed_save_points_->size();
SavePointBatchHandler sp_handler(&write_batch_,
*wupt_db_->GetCFHandleMap().get());
size_t curr_boundary = trailing_batch ? wb.GetWriteBatch()->GetDataSize()
: (*unflushed_save_points_)[i];
// Construct the partial write batch up to the savepoint.
//
// Theoretically, a memcpy between the write batches should be sufficient
// since the rewriting into the batch should produce the exact same byte
// representation. Rebuilding the WriteBatchWithIndex index is still
// necessary though, and would imply doing two passes over the batch though.
Status s = WriteBatchInternal::Iterate(wb.GetWriteBatch(), &sp_handler,
prev_boundary, curr_boundary);
if (!s.ok()) {
return s;
}
if (write_batch_.GetWriteBatch()->Count() > 0) {
// Flush the write batch.
s = FlushWriteBatchToDBInternal(!kPrepared);
if (!s.ok()) {
return s;
}
}
if (!trailing_batch) {
if (flushed_save_points_ == nullptr) {
flushed_save_points_.reset(
new autovector<WriteUnpreparedTxn::SavePoint>());
}
flushed_save_points_->emplace_back(
unprep_seqs_, new ManagedSnapshot(db_impl_, wupt_db_->GetSnapshot()));
}
prev_boundary = curr_boundary;
const bool kClear = true;
TransactionBaseImpl::InitWriteBatch(kClear);
}
unflushed_save_points_->clear();
return Status::OK();
}
Status WriteUnpreparedTxn::PrepareInternal() {
const bool kPrepared = true;
return FlushWriteBatchToDB(kPrepared);
}
Status WriteUnpreparedTxn::CommitWithoutPrepareInternal() {
if (unprep_seqs_.empty()) {
assert(log_number_ == 0);
assert(GetId() == 0);
return WritePreparedTxn::CommitWithoutPrepareInternal();
}
// TODO(lth): We should optimize commit without prepare to not perform
// a prepare under the hood.
auto s = PrepareInternal();
if (!s.ok()) {
return s;
}
return CommitInternal();
}
Status WriteUnpreparedTxn::CommitInternal() {
// TODO(lth): Reduce duplicate code with WritePrepared commit logic.
// We take the commit-time batch and append the Commit marker. The Memtable
// will ignore the Commit marker in non-recovery mode
WriteBatch* working_batch = GetCommitTimeWriteBatch();
const bool empty = working_batch->Count() == 0;
auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
assert(s.ok());
const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
if (!empty) {
// When not writing to memtable, we can still cache the latest write batch.
// The cached batch will be written to memtable in WriteRecoverableState
// during FlushMemTable
if (for_recovery) {
WriteBatchInternal::SetAsLatestPersistentState(working_batch);
} else {
return Status::InvalidArgument(
"Commit-time-batch can only be used if "
"use_only_the_last_commit_time_batch_for_recovery is true");
}
}
const bool includes_data = !empty && !for_recovery;
size_t commit_batch_cnt = 0;
if (UNLIKELY(includes_data)) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Duplicate key overhead");
SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
s = working_batch->Iterate(&counter);
assert(s.ok());
commit_batch_cnt = counter.BatchCount();
}
const bool disable_memtable = !includes_data;
const bool do_one_write =
!db_impl_->immutable_db_options().two_write_queues || disable_memtable;
WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, unprep_seqs_, commit_batch_cnt);
const bool kFirstPrepareBatch = true;
AddPreparedCallback add_prepared_callback(
wpt_db_, db_impl_, commit_batch_cnt,
db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
PreReleaseCallback* pre_release_callback;
if (do_one_write) {
pre_release_callback = &update_commit_map;
} else {
pre_release_callback = &add_prepared_callback;
}
uint64_t seq_used = kMaxSequenceNumber;
// Since the prepared batch is directly written to memtable, there is
// already a connection between the memtable and its WAL, so there is no
// need to redundantly reference the log that contains the prepared data.
const uint64_t zero_log_number = 0ull;
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
nullptr, zero_log_number, disable_memtable, &seq_used,
batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) {
if (LIKELY(s.ok())) {
// Note RemovePrepared should be called after WriteImpl that publishsed
// the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
for (const auto& seq : unprep_seqs_) {
wpt_db_->RemovePrepared(seq.first, seq.second);
}
}
if (UNLIKELY(!do_one_write)) {
wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
}
unprep_seqs_.clear();
flushed_save_points_.reset(nullptr);
unflushed_save_points_.reset(nullptr);
return s;
} // else do the 2nd write to publish seq
// Populate unprep_seqs_ with commit_batch_seq, since we treat data in the
// commit write batch as just another "unprepared" batch. This will also
// update the unprep_seqs_ in the update_commit_map callback.
unprep_seqs_[commit_batch_seq] = commit_batch_cnt;
WriteUnpreparedCommitEntryPreReleaseCallback
update_commit_map_with_commit_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
// Note: the 2nd write comes with a performance penality. So if we have too
// many of commits accompanied with ComitTimeWriteBatch and yet we cannot
// enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
// two_write_queues should be disabled to avoid many additional writes here.
// Update commit map only from the 2nd queue
WriteBatch empty_batch;
s = empty_batch.PutLogData(Slice());
assert(s.ok());
// In the absence of Prepare markers, use Noop as a batch separator
s = WriteBatchInternal::InsertNoop(&empty_batch);
assert(s.ok());
const bool DISABLE_MEMTABLE = true;
const size_t ONE_BATCH = 1;
const uint64_t NO_REF_LOG = 0;
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used,
ONE_BATCH, &update_commit_map_with_commit_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks.
for (const auto& seq : unprep_seqs_) {
wpt_db_->RemovePrepared(seq.first, seq.second);
}
unprep_seqs_.clear();
flushed_save_points_.reset(nullptr);
unflushed_save_points_.reset(nullptr);
return s;
}
Status WriteUnpreparedTxn::WriteRollbackKeys(
const LockTracker& lock_tracker, WriteBatchWithIndex* rollback_batch,
ReadCallback* callback, const ReadOptions& roptions) {
// This assertion can be removed when range lock is supported.
assert(lock_tracker.IsPointLockSupported());
const auto& cf_map = *wupt_db_->GetCFHandleMap();
auto WriteRollbackKey = [&](const std::string& key, uint32_t cfid) {
const auto& cf_handle = cf_map.at(cfid);
PinnableSlice pinnable_val;
bool not_used;
DBImpl::GetImplOptions get_impl_options;
get_impl_options.column_family = cf_handle;
get_impl_options.value = &pinnable_val;
get_impl_options.value_found = &not_used;
get_impl_options.callback = callback;
auto s = db_impl_->GetImpl(roptions, key, get_impl_options);
if (s.ok()) {
s = rollback_batch->Put(cf_handle, key, pinnable_val);
assert(s.ok());
} else if (s.IsNotFound()) {
if (wupt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) {
s = rollback_batch->SingleDelete(cf_handle, key);
} else {
s = rollback_batch->Delete(cf_handle, key);
}
assert(s.ok());
} else {
return s;
}
return Status::OK();
};
std::unique_ptr<LockTracker::ColumnFamilyIterator> cf_it(
lock_tracker.GetColumnFamilyIterator());
assert(cf_it != nullptr);
while (cf_it->HasNext()) {
ColumnFamilyId cf = cf_it->Next();
std::unique_ptr<LockTracker::KeyIterator> key_it(
lock_tracker.GetKeyIterator(cf));
assert(key_it != nullptr);
while (key_it->HasNext()) {
const std::string& key = key_it->Next();
auto s = WriteRollbackKey(key, cf);
if (!s.ok()) {
return s;
}
}
}
for (const auto& cfkey : untracked_keys_) {
const auto cfid = cfkey.first;
const auto& keys = cfkey.second;
for (const auto& key : keys) {
auto s = WriteRollbackKey(key, cfid);
if (!s.ok()) {
return s;
}
}
}
return Status::OK();
}
Status WriteUnpreparedTxn::RollbackInternal() {
// TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
WriteBatchWithIndex rollback_batch(
wpt_db_->DefaultColumnFamily()->GetComparator(), 0, true, 0,
write_options_.protection_bytes_per_key);
assert(GetId() != kMaxSequenceNumber);
assert(GetId() > 0);
Status s;
auto read_at_seq = kMaxSequenceNumber;
// TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions roptions;
// to prevent callback's seq to be overrriden inside DBImpk::Get
roptions.snapshot = wpt_db_->GetMaxSnapshot();
// Note that we do not use WriteUnpreparedTxnReadCallback because we do not
// need to read our own writes when reading prior versions of the key for
// rollback.
WritePreparedTxnReadCallback callback(wpt_db_, read_at_seq);
// TODO(lth): We write rollback batch all in a single batch here, but this
// should be subdivded into multiple batches as well. In phase 2, when key
// sets are read from WAL, this will happen naturally.
s = WriteRollbackKeys(*tracked_locks_, &rollback_batch, &callback, roptions);
if (!s.ok()) {
return s;
}
// The Rollback marker will be used as a batch separator
s = WriteBatchInternal::MarkRollback(rollback_batch.GetWriteBatch(), name_);
assert(s.ok());
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
const bool DISABLE_MEMTABLE = true;
const uint64_t NO_REF_LOG = 0;
uint64_t seq_used = kMaxSequenceNumber;
// Rollback batch may contain duplicate keys, because tracked_keys_ is not
// comparator aware.
auto rollback_batch_cnt = rollback_batch.SubBatchCnt();
// We commit the rolled back prepared batches. Although this is
// counter-intuitive, i) it is safe to do so, since the prepared batches are
// already canceled out by the rollback batch, ii) adding the commit entry to
// CommitCache will allow us to benefit from the existing mechanism in
// CommitCache that keeps an entry evicted due to max advance and yet overlaps
// with a live snapshot around so that the live snapshot properly skips the
// entry even if its prepare seq is lower than max_evicted_seq_.
//
// TODO(lth): RollbackInternal is conceptually very similar to
// CommitInternal, with the rollback batch simply taking on the role of
// CommitTimeWriteBatch. We should be able to merge the two code paths.
WriteUnpreparedCommitEntryPreReleaseCallback update_commit_map(
wpt_db_, db_impl_, unprep_seqs_, rollback_batch_cnt);
// Note: the rollback batch does not need AddPrepared since it is written to
// DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while the rollback
// batch commits with PreReleaseCallback.
s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
nullptr, nullptr, nullptr, NO_REF_LOG,
!DISABLE_MEMTABLE, &seq_used, rollback_batch_cnt,
do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) {
return s;
}
if (do_one_write) {
for (const auto& seq : unprep_seqs_) {
wpt_db_->RemovePrepared(seq.first, seq.second);
}
unprep_seqs_.clear();
flushed_save_points_.reset(nullptr);
unflushed_save_points_.reset(nullptr);
return s;
} // else do the 2nd write for commit
uint64_t& prepare_seq = seq_used;
// Populate unprep_seqs_ with rollback_batch_cnt, since we treat data in the
// rollback write batch as just another "unprepared" batch. This will also
// update the unprep_seqs_ in the update_commit_map callback.
unprep_seqs_[prepare_seq] = rollback_batch_cnt;
WriteUnpreparedCommitEntryPreReleaseCallback
update_commit_map_with_rollback_batch(wpt_db_, db_impl_, unprep_seqs_, 0);
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
"RollbackInternal 2nd write prepare_seq: %" PRIu64,
prepare_seq);
WriteBatch empty_batch;
const size_t ONE_BATCH = 1;
s = empty_batch.PutLogData(Slice());
assert(s.ok());
// In the absence of Prepare markers, use Noop as a batch separator
s = WriteBatchInternal::InsertNoop(&empty_batch);
assert(s.ok());
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used,
ONE_BATCH, &update_commit_map_with_rollback_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back
if (s.ok()) {
for (const auto& seq : unprep_seqs_) {
wpt_db_->RemovePrepared(seq.first, seq.second);
}
}
unprep_seqs_.clear();
flushed_save_points_.reset(nullptr);
unflushed_save_points_.reset(nullptr);
return s;
}
void WriteUnpreparedTxn::Clear() {
if (!recovered_txn_) {
txn_db_impl_->UnLock(this, *tracked_locks_);
}
unprep_seqs_.clear();
flushed_save_points_.reset(nullptr);
unflushed_save_points_.reset(nullptr);
recovered_txn_ = false;
largest_validated_seq_ = 0;
for (auto& it : active_iterators_) {
auto bdit = static_cast<BaseDeltaIterator*>(it);
bdit->Invalidate(Status::InvalidArgument(
"Cannot use iterator after transaction has finished"));
}
active_iterators_.clear();
untracked_keys_.clear();
TransactionBaseImpl::Clear();
}
void WriteUnpreparedTxn::SetSavePoint() {
assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
(flushed_save_points_ ? flushed_save_points_->size() : 0) ==
(save_points_ ? save_points_->size() : 0));
PessimisticTransaction::SetSavePoint();
if (unflushed_save_points_ == nullptr) {
unflushed_save_points_.reset(new autovector<size_t>());
}
unflushed_save_points_->push_back(write_batch_.GetDataSize());
}
Status WriteUnpreparedTxn::RollbackToSavePoint() {
assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
(flushed_save_points_ ? flushed_save_points_->size() : 0) ==
(save_points_ ? save_points_->size() : 0));
if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
Status s = PessimisticTransaction::RollbackToSavePoint();
assert(!s.IsNotFound());
unflushed_save_points_->pop_back();
return s;
}
if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
return RollbackToSavePointInternal();
}
return Status::NotFound();
}
Status WriteUnpreparedTxn::RollbackToSavePointInternal() {
Status s;
const bool kClear = true;
TransactionBaseImpl::InitWriteBatch(kClear);
assert(flushed_save_points_->size() > 0);
WriteUnpreparedTxn::SavePoint& top = flushed_save_points_->back();
assert(save_points_ != nullptr && save_points_->size() > 0);
const LockTracker& tracked_keys = *save_points_->top().new_locks_;
// TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions roptions;
roptions.snapshot = top.snapshot_->snapshot();
SequenceNumber min_uncommitted =
static_cast_with_check<const SnapshotImpl>(roptions.snapshot)
->min_uncommitted_;
SequenceNumber snap_seq = roptions.snapshot->GetSequenceNumber();
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
top.unprep_seqs_,
kBackedByDBSnapshot);
s = WriteRollbackKeys(tracked_keys, &write_batch_, &callback, roptions);
if (!s.ok()) {
return s;
}
const bool kPrepared = true;
s = FlushWriteBatchToDBInternal(!kPrepared);
if (!s.ok()) {
return s;
}
// PessimisticTransaction::RollbackToSavePoint will call also call
// RollbackToSavepoint on write_batch_. However, write_batch_ is empty and has
// no savepoints because this savepoint has already been flushed. Work around
// this by setting a fake savepoint.
write_batch_.SetSavePoint();
s = PessimisticTransaction::RollbackToSavePoint();
assert(s.ok());
if (!s.ok()) {
return s;
}
flushed_save_points_->pop_back();
return s;
}
Status WriteUnpreparedTxn::PopSavePoint() {
assert((unflushed_save_points_ ? unflushed_save_points_->size() : 0) +
(flushed_save_points_ ? flushed_save_points_->size() : 0) ==
(save_points_ ? save_points_->size() : 0));
if (unflushed_save_points_ != nullptr && unflushed_save_points_->size() > 0) {
Status s = PessimisticTransaction::PopSavePoint();
assert(!s.IsNotFound());
unflushed_save_points_->pop_back();
return s;
}
if (flushed_save_points_ != nullptr && !flushed_save_points_->empty()) {
// PessimisticTransaction::PopSavePoint will call also call PopSavePoint on
// write_batch_. However, write_batch_ is empty and has no savepoints
// because this savepoint has already been flushed. Work around this by
// setting a fake savepoint.
write_batch_.SetSavePoint();
Status s = PessimisticTransaction::PopSavePoint();
assert(!s.IsNotFound());
flushed_save_points_->pop_back();
return s;
}
return Status::NotFound();
}
void WriteUnpreparedTxn::MultiGet(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const size_t num_keys, const Slice* keys,
PinnableSlice* values, Status* statuses,
const bool sorted_input) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kMultiGet) {
Status s = Status::InvalidArgument(
"Can only call MultiGet with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
for (size_t i = 0; i < num_keys; ++i) {
if (statuses[i].ok()) {
statuses[i] = s;
}
}
return;
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kMultiGet;
}
SequenceNumber min_uncommitted, snap_seq;
const SnapshotBackup backed_by_snapshot = wupt_db_->AssignMinMaxSeqs(
read_options.snapshot, &min_uncommitted, &snap_seq);
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
unprep_seqs_, backed_by_snapshot);
write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
num_keys, keys, values, statuses,
sorted_input, &callback);
if (UNLIKELY(!callback.valid() ||
!wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
for (size_t i = 0; i < num_keys; i++) {
statuses[i] = Status::TryAgain();
}
}
}
Status WriteUnpreparedTxn::Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
if (_read_options.io_activity != Env::IOActivity::kUnknown &&
_read_options.io_activity != Env::IOActivity::kGet) {
return Status::InvalidArgument(
"Can only call Get with `ReadOptions::io_activity` is "
"`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
}
ReadOptions read_options(_read_options);
if (read_options.io_activity == Env::IOActivity::kUnknown) {
read_options.io_activity = Env::IOActivity::kGet;
}
return GetImpl(read_options, column_family, key, value);
}
Status WriteUnpreparedTxn::GetImpl(const ReadOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) {
SequenceNumber min_uncommitted, snap_seq;
const SnapshotBackup backed_by_snapshot =
wupt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WriteUnpreparedTxnReadCallback callback(wupt_db_, snap_seq, min_uncommitted,
unprep_seqs_, backed_by_snapshot);
auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
value, &callback);
if (LIKELY(callback.valid() &&
wupt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
return res;
} else {
res.PermitUncheckedError();
wupt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
return Status::TryAgain();
}
}
namespace {
static void CleanupWriteUnpreparedWBWIIterator(void* arg1, void* arg2) {
auto txn = static_cast<WriteUnpreparedTxn*>(arg1);
auto iter = static_cast<Iterator*>(arg2);
txn->RemoveActiveIterator(iter);
}
} // anonymous namespace
Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options) {
return GetIterator(options, wupt_db_->DefaultColumnFamily());
}
Iterator* WriteUnpreparedTxn::GetIterator(const ReadOptions& options,
ColumnFamilyHandle* column_family) {
// Make sure to get iterator from WriteUnprepareTxnDB, not the root db.
Iterator* db_iter = wupt_db_->NewIterator(options, column_family, this);
assert(db_iter);
auto iter =
write_batch_.NewIteratorWithBase(column_family, db_iter, &options);
active_iterators_.push_back(iter);
iter->RegisterCleanup(CleanupWriteUnpreparedWBWIIterator, this, iter);
return iter;
}
Status WriteUnpreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
const Slice& key,
SequenceNumber* tracked_at_seq) {
// TODO(lth): Reduce duplicate code with WritePrepared ValidateSnapshot logic.
assert(snapshot_);
SequenceNumber min_uncommitted =
static_cast_with_check<const SnapshotImpl>(snapshot_.get())
->min_uncommitted_;
SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
// tracked_at_seq is either max or the last snapshot with which this key was
// trackeed so there is no need to apply the IsInSnapshot to this comparison
// here as tracked_at_seq is not a prepare seq.
if (*tracked_at_seq <= snap_seq) {
// If the key has been previous validated at a sequence number earlier
// than the curent snapshot's sequence number, we already know it has not
// been modified.
return Status::OK();
}
*tracked_at_seq = snap_seq;
ColumnFamilyHandle* cfh =
column_family ? column_family : db_impl_->DefaultColumnFamily();
WriteUnpreparedTxnReadCallback snap_checker(
wupt_db_, snap_seq, min_uncommitted, unprep_seqs_, kBackedByDBSnapshot);
// TODO(yanqin): Support user-defined timestamp.
return TransactionUtil::CheckKeyForConflicts(
db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
false /* cache_only */, &snap_checker, min_uncommitted,
txn_db_impl_->GetTxnDBOptions().enable_udt_validation);
}
const std::map<SequenceNumber, size_t>&
WriteUnpreparedTxn::GetUnpreparedSequenceNumbers() {
return unprep_seqs_;
}
} // namespace ROCKSDB_NAMESPACE