mirror of https://github.com/facebook/rocksdb.git
761 lines
31 KiB
C++
761 lines
31 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#ifndef __STDC_FORMAT_MACROS
|
|
#define __STDC_FORMAT_MACROS
|
|
#endif
|
|
|
|
#include "utilities/transactions/write_prepared_txn_db.h"
|
|
|
|
#include <inttypes.h>
|
|
#include <algorithm>
|
|
#include <string>
|
|
#include <unordered_set>
|
|
#include <vector>
|
|
|
|
#include "db/db_impl.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/utilities/transaction_db.h"
|
|
#include "util/cast_util.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/string_util.h"
|
|
#include "util/sync_point.h"
|
|
#include "utilities/transactions/pessimistic_transaction.h"
|
|
#include "utilities/transactions/transaction_db_mutex_impl.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
Status WritePreparedTxnDB::Initialize(
|
|
const std::vector<size_t>& compaction_enabled_cf_indices,
|
|
const std::vector<ColumnFamilyHandle*>& handles) {
|
|
auto dbimpl = reinterpret_cast<DBImpl*>(GetRootDB());
|
|
assert(dbimpl != nullptr);
|
|
auto rtxns = dbimpl->recovered_transactions();
|
|
for (auto rtxn : rtxns) {
|
|
auto cnt = rtxn.second->batch_cnt_ ? rtxn.second->batch_cnt_ : 1;
|
|
for (size_t i = 0; i < cnt; i++) {
|
|
AddPrepared(rtxn.second->seq_ + i);
|
|
}
|
|
}
|
|
SequenceNumber prev_max = max_evicted_seq_;
|
|
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
|
|
AdvanceMaxEvictedSeq(prev_max, last_seq);
|
|
|
|
db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
|
|
// A callback to commit a single sub-batch
|
|
class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
|
|
public:
|
|
explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
|
|
: db_(db) {}
|
|
virtual Status Callback(SequenceNumber commit_seq,
|
|
bool is_mem_disabled) override {
|
|
assert(!is_mem_disabled);
|
|
const bool PREPARE_SKIPPED = true;
|
|
db_->AddCommitted(commit_seq, commit_seq, PREPARE_SKIPPED);
|
|
return Status::OK();
|
|
}
|
|
|
|
private:
|
|
WritePreparedTxnDB* db_;
|
|
};
|
|
db_impl_->SetRecoverableStatePreReleaseCallback(
|
|
new CommitSubBatchPreReleaseCallback(this));
|
|
|
|
auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
|
|
handles);
|
|
return s;
|
|
}
|
|
|
|
Status WritePreparedTxnDB::VerifyCFOptions(
|
|
const ColumnFamilyOptions& cf_options) {
|
|
Status s = PessimisticTransactionDB::VerifyCFOptions(cf_options);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
if (!cf_options.memtable_factory->CanHandleDuplicatedKey()) {
|
|
return Status::InvalidArgument(
|
|
"memtable_factory->CanHandleDuplicatedKey() cannot be false with "
|
|
"WritePrpeared transactions");
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Transaction* WritePreparedTxnDB::BeginTransaction(
|
|
const WriteOptions& write_options, const TransactionOptions& txn_options,
|
|
Transaction* old_txn) {
|
|
if (old_txn != nullptr) {
|
|
ReinitializeTransaction(old_txn, write_options, txn_options);
|
|
return old_txn;
|
|
} else {
|
|
return new WritePreparedTxn(this, write_options, txn_options);
|
|
}
|
|
}
|
|
|
|
Status WritePreparedTxnDB::Write(
|
|
const WriteOptions& opts,
|
|
const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
|
|
if (optimizations.skip_concurrency_control) {
|
|
// Skip locking the rows
|
|
const size_t UNKNOWN_BATCH_CNT = 0;
|
|
const size_t ONE_BATCH_CNT = 1;
|
|
const size_t batch_cnt = optimizations.skip_duplicate_key_check
|
|
? ONE_BATCH_CNT
|
|
: UNKNOWN_BATCH_CNT;
|
|
WritePreparedTxn* NO_TXN = nullptr;
|
|
return WriteInternal(opts, updates, batch_cnt, NO_TXN);
|
|
} else {
|
|
// TODO(myabandeh): Make use of skip_duplicate_key_check hint
|
|
// Fall back to unoptimized version
|
|
return PessimisticTransactionDB::Write(opts, updates);
|
|
}
|
|
}
|
|
|
|
Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
|
|
WriteBatch* batch, size_t batch_cnt,
|
|
WritePreparedTxn* txn) {
|
|
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
|
|
"CommitBatchInternal");
|
|
if (batch->Count() == 0) {
|
|
// Otherwise our 1 seq per batch logic will break since there is no seq
|
|
// increased for this batch.
|
|
return Status::OK();
|
|
}
|
|
if (batch_cnt == 0) { // not provided, then compute it
|
|
// TODO(myabandeh): add an option to allow user skipping this cost
|
|
SubBatchCounter counter(*GetCFComparatorMap());
|
|
auto s = batch->Iterate(&counter);
|
|
assert(s.ok());
|
|
batch_cnt = counter.BatchCount();
|
|
// TODO(myabandeh): replace me with a stat
|
|
ROCKS_LOG_WARN(info_log_, "Duplicate key overhead: %" PRIu64 " batches",
|
|
static_cast<uint64_t>(batch_cnt));
|
|
}
|
|
assert(batch_cnt);
|
|
|
|
bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
|
|
WriteOptions write_options(write_options_orig);
|
|
bool sync = write_options.sync;
|
|
if (!do_one_write) {
|
|
// No need to sync on the first write
|
|
write_options.sync = false;
|
|
}
|
|
// In the absence of Prepare markers, use Noop as a batch separator
|
|
WriteBatchInternal::InsertNoop(batch);
|
|
const bool DISABLE_MEMTABLE = true;
|
|
const uint64_t no_log_ref = 0;
|
|
uint64_t seq_used = kMaxSequenceNumber;
|
|
const size_t ZERO_PREPARES = 0;
|
|
// Since this is not 2pc, there is no need for AddPrepared but having it in
|
|
// the PreReleaseCallback enables an optimization. Refer to
|
|
// SmallestUnCommittedSeq for more details.
|
|
AddPreparedCallback add_prepared_callback(
|
|
this, batch_cnt, db_impl_->immutable_db_options().two_write_queues);
|
|
WritePreparedCommitEntryPreReleaseCallback update_commit_map(
|
|
this, db_impl_, kMaxSequenceNumber, ZERO_PREPARES, batch_cnt);
|
|
PreReleaseCallback* pre_release_callback;
|
|
if (do_one_write) {
|
|
pre_release_callback = &update_commit_map;
|
|
} else {
|
|
pre_release_callback = &add_prepared_callback;
|
|
}
|
|
auto s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr,
|
|
no_log_ref, !DISABLE_MEMTABLE, &seq_used,
|
|
batch_cnt, pre_release_callback);
|
|
assert(!s.ok() || seq_used != kMaxSequenceNumber);
|
|
uint64_t& prepare_seq = seq_used;
|
|
if (txn != nullptr) {
|
|
txn->SetId(prepare_seq);
|
|
}
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
if (do_one_write) {
|
|
return s;
|
|
} // else do the 2nd write for commit
|
|
// Set the original value of sync
|
|
write_options.sync = sync;
|
|
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
|
|
"CommitBatchInternal 2nd write prepare_seq: %" PRIu64,
|
|
prepare_seq);
|
|
// Commit the batch by writing an empty batch to the 2nd queue that will
|
|
// release the commit sequence number to readers.
|
|
const size_t ZERO_COMMITS = 0;
|
|
const bool PREP_HEAP_SKIPPED = true;
|
|
WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_prepare(
|
|
this, db_impl_, prepare_seq, batch_cnt, ZERO_COMMITS, !PREP_HEAP_SKIPPED);
|
|
WriteBatch empty_batch;
|
|
empty_batch.PutLogData(Slice());
|
|
const size_t ONE_BATCH = 1;
|
|
// In the absence of Prepare markers, use Noop as a batch separator
|
|
WriteBatchInternal::InsertNoop(&empty_batch);
|
|
s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
|
|
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
|
|
&update_commit_map_with_prepare);
|
|
assert(!s.ok() || seq_used != kMaxSequenceNumber);
|
|
return s;
|
|
}
|
|
|
|
Status WritePreparedTxnDB::Get(const ReadOptions& options,
|
|
ColumnFamilyHandle* column_family,
|
|
const Slice& key, PinnableSlice* value) {
|
|
// We are fine with the latest committed value. This could be done by
|
|
// specifying the snapshot as kMaxSequenceNumber.
|
|
SequenceNumber seq = kMaxSequenceNumber;
|
|
SequenceNumber min_uncommitted = 0;
|
|
if (options.snapshot != nullptr) {
|
|
seq = options.snapshot->GetSequenceNumber();
|
|
min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
|
|
options.snapshot)
|
|
->min_uncommitted_;
|
|
} else {
|
|
min_uncommitted = SmallestUnCommittedSeq();
|
|
}
|
|
WritePreparedTxnReadCallback callback(this, seq, min_uncommitted);
|
|
bool* dont_care = nullptr;
|
|
// Note: no need to specify a snapshot for read options as no specific
|
|
// snapshot is requested by the user.
|
|
return db_impl_->GetImpl(options, column_family, key, value, dont_care,
|
|
&callback);
|
|
}
|
|
|
|
void WritePreparedTxnDB::UpdateCFComparatorMap(
|
|
const std::vector<ColumnFamilyHandle*>& handles) {
|
|
auto cf_map = new std::map<uint32_t, const Comparator*>();
|
|
for (auto h : handles) {
|
|
auto id = h->GetID();
|
|
const Comparator* comparator = h->GetComparator();
|
|
(*cf_map)[id] = comparator;
|
|
}
|
|
cf_map_.store(cf_map);
|
|
cf_map_gc_.reset(cf_map);
|
|
}
|
|
|
|
void WritePreparedTxnDB::UpdateCFComparatorMap(
|
|
const ColumnFamilyHandle* h) {
|
|
auto old_cf_map_ptr = cf_map_.load();
|
|
assert(old_cf_map_ptr);
|
|
auto cf_map = new std::map<uint32_t, const Comparator*>(*old_cf_map_ptr);
|
|
auto id = h->GetID();
|
|
const Comparator* comparator = h->GetComparator();
|
|
(*cf_map)[id] = comparator;
|
|
cf_map_.store(cf_map);
|
|
cf_map_gc_.reset(cf_map);
|
|
}
|
|
|
|
|
|
std::vector<Status> WritePreparedTxnDB::MultiGet(
|
|
const ReadOptions& options,
|
|
const std::vector<ColumnFamilyHandle*>& column_family,
|
|
const std::vector<Slice>& keys, std::vector<std::string>* values) {
|
|
assert(values);
|
|
size_t num_keys = keys.size();
|
|
values->resize(num_keys);
|
|
|
|
std::vector<Status> stat_list(num_keys);
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
std::string* value = values ? &(*values)[i] : nullptr;
|
|
stat_list[i] = this->Get(options, column_family[i], keys[i], value);
|
|
}
|
|
return stat_list;
|
|
}
|
|
|
|
// Struct to hold ownership of snapshot and read callback for iterator cleanup.
|
|
struct WritePreparedTxnDB::IteratorState {
|
|
IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
|
|
std::shared_ptr<ManagedSnapshot> s,
|
|
SequenceNumber min_uncommitted)
|
|
: callback(txn_db, sequence, min_uncommitted), snapshot(s) {}
|
|
|
|
WritePreparedTxnReadCallback callback;
|
|
std::shared_ptr<ManagedSnapshot> snapshot;
|
|
};
|
|
|
|
namespace {
|
|
static void CleanupWritePreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
|
|
delete reinterpret_cast<WritePreparedTxnDB::IteratorState*>(arg1);
|
|
}
|
|
} // anonymous namespace
|
|
|
|
Iterator* WritePreparedTxnDB::NewIterator(const ReadOptions& options,
|
|
ColumnFamilyHandle* column_family) {
|
|
constexpr bool ALLOW_BLOB = true;
|
|
constexpr bool ALLOW_REFRESH = true;
|
|
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
|
|
SequenceNumber snapshot_seq = kMaxSequenceNumber;
|
|
SequenceNumber min_uncommitted = 0;
|
|
if (options.snapshot != nullptr) {
|
|
snapshot_seq = options.snapshot->GetSequenceNumber();
|
|
min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
|
|
options.snapshot)
|
|
->min_uncommitted_;
|
|
} else {
|
|
auto* snapshot = GetSnapshot();
|
|
// We take a snapshot to make sure that the related data in the commit map
|
|
// are not deleted.
|
|
snapshot_seq = snapshot->GetSequenceNumber();
|
|
min_uncommitted =
|
|
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
|
|
->min_uncommitted_;
|
|
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
|
|
}
|
|
assert(snapshot_seq != kMaxSequenceNumber);
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
|
|
auto* state =
|
|
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
|
|
auto* db_iter =
|
|
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
|
|
!ALLOW_BLOB, !ALLOW_REFRESH);
|
|
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
|
|
return db_iter;
|
|
}
|
|
|
|
Status WritePreparedTxnDB::NewIterators(
|
|
const ReadOptions& options,
|
|
const std::vector<ColumnFamilyHandle*>& column_families,
|
|
std::vector<Iterator*>* iterators) {
|
|
constexpr bool ALLOW_BLOB = true;
|
|
constexpr bool ALLOW_REFRESH = true;
|
|
std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
|
|
SequenceNumber snapshot_seq = kMaxSequenceNumber;
|
|
SequenceNumber min_uncommitted = 0;
|
|
if (options.snapshot != nullptr) {
|
|
snapshot_seq = options.snapshot->GetSequenceNumber();
|
|
min_uncommitted = static_cast_with_check<const SnapshotImpl, const Snapshot>(
|
|
options.snapshot)
|
|
->min_uncommitted_;
|
|
} else {
|
|
auto* snapshot = GetSnapshot();
|
|
// We take a snapshot to make sure that the related data in the commit map
|
|
// are not deleted.
|
|
snapshot_seq = snapshot->GetSequenceNumber();
|
|
own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
|
|
min_uncommitted =
|
|
static_cast_with_check<const SnapshotImpl, const Snapshot>(snapshot)
|
|
->min_uncommitted_;
|
|
}
|
|
iterators->clear();
|
|
iterators->reserve(column_families.size());
|
|
for (auto* column_family : column_families) {
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
|
|
auto* state =
|
|
new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted);
|
|
auto* db_iter =
|
|
db_impl_->NewIteratorImpl(options, cfd, snapshot_seq, &state->callback,
|
|
!ALLOW_BLOB, !ALLOW_REFRESH);
|
|
db_iter->RegisterCleanup(CleanupWritePreparedTxnDBIterator, state, nullptr);
|
|
iterators->push_back(db_iter);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
|
|
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps
|
|
// around.
|
|
INC_STEP_FOR_MAX_EVICTED =
|
|
std::max(COMMIT_CACHE_SIZE / 100, static_cast<size_t>(1));
|
|
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
|
|
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
|
|
commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>(
|
|
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
|
|
}
|
|
|
|
void WritePreparedTxnDB::AddPrepared(uint64_t seq) {
|
|
ROCKS_LOG_DETAILS(info_log_, "Txn %" PRIu64 " Prepareing", seq);
|
|
assert(seq > max_evicted_seq_);
|
|
if (seq <= max_evicted_seq_) {
|
|
throw std::runtime_error(
|
|
"Added prepare_seq is larger than max_evicted_seq_: " + ToString(seq) +
|
|
" <= " + ToString(max_evicted_seq_.load()));
|
|
}
|
|
WriteLock wl(&prepared_mutex_);
|
|
prepared_txns_.push(seq);
|
|
}
|
|
|
|
void WritePreparedTxnDB::RollbackPrepared(uint64_t prep_seq,
|
|
uint64_t /*rollback_seq*/) {
|
|
ROCKS_LOG_DETAILS(
|
|
info_log_, "Txn %" PRIu64 " rolling back with rollback seq of " PRIu64 "",
|
|
prep_seq, rollback_seq);
|
|
std::vector<SequenceNumber> snapshots =
|
|
GetSnapshotListFromDB(kMaxSequenceNumber);
|
|
// TODO(myabandeh): currently we are assuming that there is no snapshot taken
|
|
// when a transaciton is rolled back. This is the case the way MySQL does
|
|
// rollback which is after recovery. We should extend it to be able to
|
|
// rollback txns that overlap with exsiting snapshots.
|
|
assert(snapshots.size() == 0);
|
|
if (snapshots.size()) {
|
|
throw std::runtime_error(
|
|
"Rollback reqeust while there are live snapshots.");
|
|
}
|
|
WriteLock wl(&prepared_mutex_);
|
|
prepared_txns_.erase(prep_seq);
|
|
bool was_empty = delayed_prepared_.empty();
|
|
if (!was_empty) {
|
|
delayed_prepared_.erase(prep_seq);
|
|
bool is_empty = delayed_prepared_.empty();
|
|
if (was_empty != is_empty) {
|
|
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
|
|
}
|
|
}
|
|
}
|
|
|
|
void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq, uint64_t commit_seq,
|
|
bool prepare_skipped, uint8_t loop_cnt) {
|
|
ROCKS_LOG_DETAILS(info_log_,
|
|
"Txn %" PRIu64 " Committing with %" PRIu64
|
|
"(prepare_skipped=%d)",
|
|
prepare_seq, commit_seq, prepare_skipped);
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start");
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:start:pause");
|
|
auto indexed_seq = prepare_seq % COMMIT_CACHE_SIZE;
|
|
CommitEntry64b evicted_64b;
|
|
CommitEntry evicted;
|
|
bool to_be_evicted = GetCommitEntry(indexed_seq, &evicted_64b, &evicted);
|
|
if (LIKELY(to_be_evicted)) {
|
|
assert(evicted.prep_seq != prepare_seq);
|
|
auto prev_max = max_evicted_seq_.load(std::memory_order_acquire);
|
|
ROCKS_LOG_DETAILS(info_log_,
|
|
"Evicting %" PRIu64 ",%" PRIu64 " with max %" PRIu64,
|
|
evicted.prep_seq, evicted.commit_seq, prev_max);
|
|
if (prev_max < evicted.commit_seq) {
|
|
// Inc max in larger steps to avoid frequent updates
|
|
auto max_evicted_seq = evicted.commit_seq + INC_STEP_FOR_MAX_EVICTED;
|
|
AdvanceMaxEvictedSeq(prev_max, max_evicted_seq);
|
|
}
|
|
// After each eviction from commit cache, check if the commit entry should
|
|
// be kept around because it overlaps with a live snapshot.
|
|
CheckAgainstSnapshots(evicted);
|
|
}
|
|
bool succ =
|
|
ExchangeCommitEntry(indexed_seq, evicted_64b, {prepare_seq, commit_seq});
|
|
if (UNLIKELY(!succ)) {
|
|
ROCKS_LOG_ERROR(info_log_,
|
|
"ExchangeCommitEntry failed on [%" PRIu64 "] %" PRIu64
|
|
",%" PRIu64 " retrying...",
|
|
indexed_seq, prepare_seq, commit_seq);
|
|
// A very rare event, in which the commit entry is updated before we do.
|
|
// Here we apply a very simple solution of retrying.
|
|
if (loop_cnt > 100) {
|
|
throw std::runtime_error("Infinite loop in AddCommitted!");
|
|
}
|
|
AddCommitted(prepare_seq, commit_seq, prepare_skipped, ++loop_cnt);
|
|
return;
|
|
}
|
|
if (!prepare_skipped) {
|
|
WriteLock wl(&prepared_mutex_);
|
|
prepared_txns_.erase(prepare_seq);
|
|
bool was_empty = delayed_prepared_.empty();
|
|
if (!was_empty) {
|
|
delayed_prepared_.erase(prepare_seq);
|
|
bool is_empty = delayed_prepared_.empty();
|
|
if (was_empty != is_empty) {
|
|
delayed_prepared_empty_.store(is_empty, std::memory_order_release);
|
|
}
|
|
}
|
|
}
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end");
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::AddCommitted:end:pause");
|
|
}
|
|
|
|
bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
|
|
CommitEntry64b* entry_64b,
|
|
CommitEntry* entry) const {
|
|
*entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire);
|
|
bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
|
|
return valid;
|
|
}
|
|
|
|
bool WritePreparedTxnDB::AddCommitEntry(const uint64_t indexed_seq,
|
|
const CommitEntry& new_entry,
|
|
CommitEntry* evicted_entry) {
|
|
CommitEntry64b new_entry_64b(new_entry, FORMAT);
|
|
CommitEntry64b evicted_entry_64b = commit_cache_[indexed_seq].exchange(
|
|
new_entry_64b, std::memory_order_acq_rel);
|
|
bool valid = evicted_entry_64b.Parse(indexed_seq, evicted_entry, FORMAT);
|
|
return valid;
|
|
}
|
|
|
|
bool WritePreparedTxnDB::ExchangeCommitEntry(const uint64_t indexed_seq,
|
|
CommitEntry64b& expected_entry_64b,
|
|
const CommitEntry& new_entry) {
|
|
auto& atomic_entry = commit_cache_[indexed_seq];
|
|
CommitEntry64b new_entry_64b(new_entry, FORMAT);
|
|
bool succ = atomic_entry.compare_exchange_strong(
|
|
expected_entry_64b, new_entry_64b, std::memory_order_acq_rel,
|
|
std::memory_order_acquire);
|
|
return succ;
|
|
}
|
|
|
|
void WritePreparedTxnDB::AdvanceMaxEvictedSeq(const SequenceNumber& prev_max,
|
|
const SequenceNumber& new_max) {
|
|
ROCKS_LOG_DETAILS(info_log_,
|
|
"AdvanceMaxEvictedSeq overhead %" PRIu64 " => %" PRIu64,
|
|
prev_max, new_max);
|
|
// When max_evicted_seq_ advances, move older entries from prepared_txns_
|
|
// to delayed_prepared_. This guarantees that if a seq is lower than max,
|
|
// then it is not in prepared_txns_ ans save an expensive, synchronized
|
|
// lookup from a shared set. delayed_prepared_ is expected to be empty in
|
|
// normal cases.
|
|
{
|
|
WriteLock wl(&prepared_mutex_);
|
|
while (!prepared_txns_.empty() && prepared_txns_.top() <= new_max) {
|
|
auto to_be_popped = prepared_txns_.top();
|
|
delayed_prepared_.insert(to_be_popped);
|
|
// TODO(myabandeh): also add a stat
|
|
ROCKS_LOG_WARN(info_log_,
|
|
"prepared_mutex_ overhead %" PRIu64 " (prep=%" PRIu64
|
|
" new_max=%" PRIu64 " oldmax=%" PRIu64,
|
|
static_cast<uint64_t>(delayed_prepared_.size()),
|
|
to_be_popped, new_max, prev_max);
|
|
prepared_txns_.pop();
|
|
delayed_prepared_empty_.store(false, std::memory_order_release);
|
|
}
|
|
}
|
|
|
|
// With each change to max_evicted_seq_ fetch the live snapshots behind it.
|
|
// We use max as the version of snapshots to identify how fresh are the
|
|
// snapshot list. This works because the snapshots are between 0 and
|
|
// max, so the larger the max, the more complete they are.
|
|
SequenceNumber new_snapshots_version = new_max;
|
|
std::vector<SequenceNumber> snapshots;
|
|
bool update_snapshots = false;
|
|
if (new_snapshots_version > snapshots_version_) {
|
|
// This is to avoid updating the snapshots_ if it already updated
|
|
// with a more recent vesion by a concrrent thread
|
|
update_snapshots = true;
|
|
// We only care about snapshots lower then max
|
|
snapshots = GetSnapshotListFromDB(new_max);
|
|
}
|
|
if (update_snapshots) {
|
|
UpdateSnapshots(snapshots, new_snapshots_version);
|
|
}
|
|
auto updated_prev_max = prev_max;
|
|
while (updated_prev_max < new_max &&
|
|
!max_evicted_seq_.compare_exchange_weak(updated_prev_max, new_max,
|
|
std::memory_order_acq_rel,
|
|
std::memory_order_relaxed)) {
|
|
};
|
|
}
|
|
|
|
const Snapshot* WritePreparedTxnDB::GetSnapshot() {
|
|
const bool FOR_WW_CONFLICT_CHECK = true;
|
|
SnapshotImpl* snap_impl = db_impl_->GetSnapshotImpl(!FOR_WW_CONFLICT_CHECK);
|
|
assert(snap_impl);
|
|
EnhanceSnapshot(snap_impl);
|
|
return snap_impl;
|
|
}
|
|
|
|
const std::vector<SequenceNumber> WritePreparedTxnDB::GetSnapshotListFromDB(
|
|
SequenceNumber max) {
|
|
ROCKS_LOG_DETAILS(info_log_, "GetSnapshotListFromDB with max %" PRIu64, max);
|
|
InstrumentedMutex(db_impl_->mutex());
|
|
return db_impl_->snapshots().GetAll(nullptr, max);
|
|
}
|
|
|
|
void WritePreparedTxnDB::ReleaseSnapshot(const Snapshot* snapshot) {
|
|
auto snap_seq = snapshot->GetSequenceNumber();
|
|
ReleaseSnapshotInternal(snap_seq);
|
|
db_impl_->ReleaseSnapshot(snapshot);
|
|
}
|
|
|
|
void WritePreparedTxnDB::ReleaseSnapshotInternal(
|
|
const SequenceNumber snap_seq) {
|
|
// relax is enough since max increases monotonically, i.e., if snap_seq <
|
|
// old_max => snap_seq < new_max as well.
|
|
if (snap_seq < max_evicted_seq_.load(std::memory_order_relaxed)) {
|
|
// Then this is a rare case that transaction did not finish before max
|
|
// advances. It is expected for a few read-only backup snapshots. For such
|
|
// snapshots we might have kept around a couple of entries in the
|
|
// old_commit_map_. Check and do garbage collection if that is the case.
|
|
bool need_gc = false;
|
|
{
|
|
// TODO(myabandeh): also add a stat
|
|
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
|
|
ReadLock rl(&old_commit_map_mutex_);
|
|
auto prep_set_entry = old_commit_map_.find(snap_seq);
|
|
need_gc = prep_set_entry != old_commit_map_.end();
|
|
}
|
|
if (need_gc) {
|
|
// TODO(myabandeh): also add a stat
|
|
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
|
|
WriteLock wl(&old_commit_map_mutex_);
|
|
old_commit_map_.erase(snap_seq);
|
|
old_commit_map_empty_.store(old_commit_map_.empty(),
|
|
std::memory_order_release);
|
|
}
|
|
}
|
|
}
|
|
|
|
void WritePreparedTxnDB::UpdateSnapshots(
|
|
const std::vector<SequenceNumber>& snapshots,
|
|
const SequenceNumber& version) {
|
|
ROCKS_LOG_DETAILS(info_log_, "UpdateSnapshots with version %" PRIu64,
|
|
version);
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:start");
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:start");
|
|
#ifndef NDEBUG
|
|
size_t sync_i = 0;
|
|
#endif
|
|
// TODO(myabandeh): replace me with a stat
|
|
ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
|
|
WriteLock wl(&snapshots_mutex_);
|
|
snapshots_version_ = version;
|
|
// We update the list concurrently with the readers.
|
|
// Both new and old lists are sorted and the new list is subset of the
|
|
// previous list plus some new items. Thus if a snapshot repeats in
|
|
// both new and old lists, it will appear upper in the new list. So if
|
|
// we simply insert the new snapshots in order, if an overwritten item
|
|
// is still valid in the new list is either written to the same place in
|
|
// the array or it is written in a higher palce before it gets
|
|
// overwritten by another item. This guarantess a reader that reads the
|
|
// list bottom-up will eventaully see a snapshot that repeats in the
|
|
// update, either before it gets overwritten by the writer or
|
|
// afterwards.
|
|
size_t i = 0;
|
|
auto it = snapshots.begin();
|
|
for (; it != snapshots.end() && i < SNAPSHOT_CACHE_SIZE; it++, i++) {
|
|
snapshot_cache_[i].store(*it, std::memory_order_release);
|
|
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", ++sync_i);
|
|
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
|
|
}
|
|
#ifndef NDEBUG
|
|
// Release the remaining sync points since they are useless given that the
|
|
// reader would also use lock to access snapshots
|
|
for (++sync_i; sync_i <= 10; ++sync_i) {
|
|
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:", sync_i);
|
|
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:", sync_i);
|
|
}
|
|
#endif
|
|
snapshots_.clear();
|
|
for (; it != snapshots.end(); it++) {
|
|
// Insert them to a vector that is less efficient to access
|
|
// concurrently
|
|
snapshots_.push_back(*it);
|
|
}
|
|
// Update the size at the end. Otherwise a parallel reader might read
|
|
// items that are not set yet.
|
|
snapshots_total_.store(snapshots.size(), std::memory_order_release);
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:p:end");
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::UpdateSnapshots:s:end");
|
|
}
|
|
|
|
void WritePreparedTxnDB::CheckAgainstSnapshots(const CommitEntry& evicted) {
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:start");
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:start");
|
|
#ifndef NDEBUG
|
|
size_t sync_i = 0;
|
|
#endif
|
|
// First check the snapshot cache that is efficient for concurrent access
|
|
auto cnt = snapshots_total_.load(std::memory_order_acquire);
|
|
// The list might get updated concurrently as we are reading from it. The
|
|
// reader should be able to read all the snapshots that are still valid
|
|
// after the update. Since the survived snapshots are written in a higher
|
|
// place before gets overwritten the reader that reads bottom-up will
|
|
// eventully see it.
|
|
const bool next_is_larger = true;
|
|
SequenceNumber snapshot_seq = kMaxSequenceNumber;
|
|
size_t ip1 = std::min(cnt, SNAPSHOT_CACHE_SIZE);
|
|
for (; 0 < ip1; ip1--) {
|
|
snapshot_seq = snapshot_cache_[ip1 - 1].load(std::memory_order_acquire);
|
|
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:",
|
|
++sync_i);
|
|
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
|
|
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
|
|
snapshot_seq, !next_is_larger)) {
|
|
break;
|
|
}
|
|
}
|
|
#ifndef NDEBUG
|
|
// Release the remaining sync points before accquiring the lock
|
|
for (++sync_i; sync_i <= 10; ++sync_i) {
|
|
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:", sync_i);
|
|
TEST_IDX_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:", sync_i);
|
|
}
|
|
#endif
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:p:end");
|
|
TEST_SYNC_POINT("WritePreparedTxnDB::CheckAgainstSnapshots:s:end");
|
|
if (UNLIKELY(SNAPSHOT_CACHE_SIZE < cnt && ip1 == SNAPSHOT_CACHE_SIZE &&
|
|
snapshot_seq < evicted.prep_seq)) {
|
|
// Then access the less efficient list of snapshots_
|
|
// TODO(myabandeh): also add a stat
|
|
ROCKS_LOG_WARN(info_log_, "snapshots_mutex_ overhead");
|
|
ReadLock rl(&snapshots_mutex_);
|
|
// Items could have moved from the snapshots_ to snapshot_cache_ before
|
|
// accquiring the lock. To make sure that we do not miss a valid snapshot,
|
|
// read snapshot_cache_ again while holding the lock.
|
|
for (size_t i = 0; i < SNAPSHOT_CACHE_SIZE; i++) {
|
|
snapshot_seq = snapshot_cache_[i].load(std::memory_order_acquire);
|
|
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
|
|
snapshot_seq, next_is_larger)) {
|
|
break;
|
|
}
|
|
}
|
|
for (auto snapshot_seq_2 : snapshots_) {
|
|
if (!MaybeUpdateOldCommitMap(evicted.prep_seq, evicted.commit_seq,
|
|
snapshot_seq_2, next_is_larger)) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
bool WritePreparedTxnDB::MaybeUpdateOldCommitMap(
|
|
const uint64_t& prep_seq, const uint64_t& commit_seq,
|
|
const uint64_t& snapshot_seq, const bool next_is_larger = true) {
|
|
// If we do not store an entry in old_commit_map_ we assume it is committed in
|
|
// all snapshots. If commit_seq <= snapshot_seq, it is considered already in
|
|
// the snapshot so we need not to keep the entry around for this snapshot.
|
|
if (commit_seq <= snapshot_seq) {
|
|
// continue the search if the next snapshot could be smaller than commit_seq
|
|
return !next_is_larger;
|
|
}
|
|
// then snapshot_seq < commit_seq
|
|
if (prep_seq <= snapshot_seq) { // overlapping range
|
|
ROCKS_LOG_WARN(info_log_, "old_commit_map_mutex_ overhead");
|
|
// TODO(myabandeh): also add a stat
|
|
WriteLock wl(&old_commit_map_mutex_);
|
|
old_commit_map_empty_.store(false, std::memory_order_release);
|
|
auto& vec = old_commit_map_[snapshot_seq];
|
|
vec.insert(std::upper_bound(vec.begin(), vec.end(), prep_seq), prep_seq);
|
|
// We need to store it once for each overlapping snapshot. Returning true to
|
|
// continue the search if there is more overlapping snapshot.
|
|
return true;
|
|
}
|
|
// continue the search if the next snapshot could be larger than prep_seq
|
|
return next_is_larger;
|
|
}
|
|
|
|
WritePreparedTxnDB::~WritePreparedTxnDB() {
|
|
// At this point there could be running compaction/flush holding a
|
|
// SnapshotChecker, which holds a pointer back to WritePreparedTxnDB.
|
|
// Make sure those jobs finished before destructing WritePreparedTxnDB.
|
|
db_impl_->CancelAllBackgroundWork(true /*wait*/);
|
|
}
|
|
|
|
void SubBatchCounter::InitWithComp(const uint32_t cf) {
|
|
auto cmp = comparators_[cf];
|
|
keys_[cf] = CFKeys(SetComparator(cmp));
|
|
}
|
|
|
|
void SubBatchCounter::AddKey(const uint32_t cf, const Slice& key) {
|
|
CFKeys& cf_keys = keys_[cf];
|
|
if (cf_keys.size() == 0) { // just inserted
|
|
InitWithComp(cf);
|
|
}
|
|
auto it = cf_keys.insert(key);
|
|
if (it.second == false) { // second is false if a element already existed.
|
|
batches_++;
|
|
keys_.clear();
|
|
InitWithComp(cf);
|
|
keys_[cf].insert(key);
|
|
}
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
#endif // ROCKSDB_LITE
|