WritePrepared Txn: Compaction/Flush

Summary:
Update Compaction/Flush to support WritePreparedTxnDB: Add SnapshotChecker which is a proxy to query WritePreparedTxnDB::IsInSnapshot. Pass SnapshotChecker to DBImpl on WritePreparedTxnDB open. CompactionIterator use it to check if a key has been committed and if it is visible to a snapshot. In CompactionIterator:
* check if key has been committed. If not, output uncommitted keys AS-IS.
* use SnapshotChecker to check if key is visible to a snapshot when in need.
* do not output key with seq = 0 if the key is not committed.
Closes https://github.com/facebook/rocksdb/pull/2926

Differential Revision: D5902907

Pulled By: yiwu-arbug

fbshipit-source-id: 945e037fdf0aa652dc5ba0ad879461040baa0320
This commit is contained in:
Yi Wu 2017-10-06 10:26:38 -07:00 committed by Facebook Github Bot
parent 01542400a8
commit d1b74b0c82
23 changed files with 592 additions and 147 deletions

View File

@ -543,10 +543,11 @@ set(SOURCES
utilities/table_properties_collectors/compact_on_deletion_collector.cc
utilities/transactions/optimistic_transaction_db_impl.cc
utilities/transactions/optimistic_transaction.cc
utilities/transactions/transaction_base.cc
utilities/transactions/pessimistic_transaction_db.cc
utilities/transactions/transaction_db_mutex_impl.cc
utilities/transactions/pessimistic_transaction.cc
utilities/transactions/pessimistic_transaction_db.cc
utilities/transactions/snapshot_checker.cc
utilities/transactions/transaction_base.cc
utilities/transactions/transaction_db_mutex_impl.cc
utilities/transactions/transaction_lock_mgr.cc
utilities/transactions/transaction_util.cc
utilities/transactions/write_prepared_txn.cc

View File

@ -248,10 +248,11 @@ cpp_library(
"utilities/table_properties_collectors/compact_on_deletion_collector.cc",
"utilities/transactions/optimistic_transaction_db_impl.cc",
"utilities/transactions/optimistic_transaction.cc",
"utilities/transactions/transaction_base.cc",
"utilities/transactions/pessimistic_transaction_db.cc",
"utilities/transactions/transaction_db_mutex_impl.cc",
"utilities/transactions/pessimistic_transaction.cc",
"utilities/transactions/pessimistic_transaction_db.cc",
"utilities/transactions/snapshot_checker.cc",
"utilities/transactions/transaction_base.cc",
"utilities/transactions/transaction_db_mutex_impl.cc",
"utilities/transactions/transaction_lock_mgr.cc",
"utilities/transactions/transaction_util.cc",
"utilities/transactions/write_prepared_txn.cc",

View File

@ -70,7 +70,7 @@ Status BuildTable(
uint32_t column_family_id, const std::string& column_family_name,
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const CompressionType compression,
SnapshotChecker* snapshot_checker, const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
@ -135,7 +135,7 @@ Status BuildTable(
CompactionIterator c_iter(
iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,
&snapshots, earliest_write_conflict_snapshot, env,
&snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
true /* internal key corruption is not ok */, range_del_agg.get());
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {

View File

@ -29,6 +29,7 @@ struct FileMetaData;
class Env;
struct EnvOptions;
class Iterator;
class SnapshotChecker;
class TableCache;
class VersionEdit;
class TableBuilder;
@ -71,7 +72,7 @@ extern Status BuildTable(
uint32_t column_family_id, const std::string& column_family_name,
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const CompressionType compression,
SnapshotChecker* snapshot_checker, const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger = nullptr, int job_id = 0,

View File

@ -4,6 +4,9 @@
// (found in the LICENSE.Apache file in the root directory).
#include "db/compaction_iterator.h"
#include "db/snapshot_checker.h"
#include "port/likely.h"
#include "rocksdb/listener.h"
#include "table/internal_iterator.h"
@ -37,15 +40,16 @@ CompactionEventListener::CompactionListenerValueType fromInternalValueType(
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
const Compaction* compaction, const CompactionFilter* compaction_filter,
CompactionEventListener* compaction_listener,
const std::atomic<bool>* shutting_down)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, env, expect_valid_internal_key,
range_del_agg,
earliest_write_conflict_snapshot, snapshot_checker, env,
expect_valid_internal_key, range_del_agg,
std::unique_ptr<CompactionProxy>(
compaction ? new CompactionProxy(compaction) : nullptr),
compaction_filter, compaction_listener, shutting_down) {}
@ -53,7 +57,8 @@ CompactionIterator::CompactionIterator(
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool expect_valid_internal_key, RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
@ -64,6 +69,7 @@ CompactionIterator::CompactionIterator(
merge_helper_(merge_helper),
snapshots_(snapshots),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
env_(env),
expect_valid_internal_key_(expect_valid_internal_key),
range_del_agg_(range_del_agg),
@ -166,6 +172,55 @@ void CompactionIterator::Next() {
PrepareOutput();
}
void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) {
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
ignore_snapshots_)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
CompactionFilter::Decision filter;
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
{
StopWatchNano timer(env_, true);
filter = compaction_filter_->FilterV2(
compaction_->level(), ikey_.user_key,
CompactionFilter::ValueType::kValue, value_,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
iter_stats_.total_filter_time +=
env_ != nullptr ? timer.ElapsedNanos() : 0;
}
if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
filter = CompactionFilter::Decision::kKeep;
}
if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
*need_skip = true;
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
*skip_until = compaction_filter_skip_until_.Encode();
}
}
}
void CompactionIterator::NextFromInput() {
at_next_ = false;
valid_ = false;
@ -220,6 +275,9 @@ void CompactionIterator::NextFromInput() {
has_outputted_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
current_key_committed_ =
(snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber));
#ifndef ROCKSDB_LITE
if (compaction_listener_) {
@ -227,53 +285,12 @@ void CompactionIterator::NextFromInput() {
fromInternalValueType(ikey_.type),
value_, ikey_.sequence, true);
}
#endif // ROCKSDB_LITE
#endif // !ROCKSDB_LITE
// apply the compaction filter to the first occurrence of the user key
if (compaction_filter_ != nullptr && ikey_.type == kTypeValue &&
(visible_at_tip_ || ikey_.sequence > latest_snapshot_ ||
ignore_snapshots_)) {
// If the user has specified a compaction filter and the sequence
// number is greater than any external snapshot, then invoke the
// filter. If the return value of the compaction filter is true,
// replace the entry with a deletion marker.
CompactionFilter::Decision filter;
compaction_filter_value_.clear();
compaction_filter_skip_until_.Clear();
{
StopWatchNano timer(env_, true);
filter = compaction_filter_->FilterV2(
compaction_->level(), ikey_.user_key,
CompactionFilter::ValueType::kValue, value_,
&compaction_filter_value_, compaction_filter_skip_until_.rep());
iter_stats_.total_filter_time +=
env_ != nullptr ? timer.ElapsedNanos() : 0;
}
if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
cmp_->Compare(*compaction_filter_skip_until_.rep(),
ikey_.user_key) <= 0) {
// Can't skip to a key smaller than the current one.
// Keep the key as per FilterV2 documentation.
filter = CompactionFilter::Decision::kKeep;
}
if (filter == CompactionFilter::Decision::kRemove) {
// convert the current key to a delete; key_ is pointing into
// current_key_ at this point, so updating current_key_ updates key()
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
need_skip = true;
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
skip_until = compaction_filter_skip_until_.Encode();
}
// Apply the compaction filter to the first committed version of the user
// key.
if (current_key_committed_) {
InvokeFilterIfNeeded(&need_skip, &skip_until);
}
} else {
#ifndef ROCKSDB_LITE
@ -292,6 +309,26 @@ void CompactionIterator::NextFromInput() {
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetInternalKey();
ikey_.user_key = current_key_.GetUserKey();
// Note that newer version of a key is ordered before older versions. If a
// newer version of a key is committed, so as the older version. No need
// to query snapshot_checker_ in that case.
if (UNLIKELY(!current_key_committed_)) {
assert(snapshot_checker_ != nullptr);
current_key_committed_ =
snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber);
// Apply the compaction filter to the first committed version of the
// user key.
if (current_key_committed_) {
InvokeFilterIfNeeded(&need_skip, &skip_until);
}
}
}
if (UNLIKELY(!current_key_committed_)) {
assert(snapshot_checker_ != nullptr);
valid_ = true;
break;
}
// If there are no snapshots, then this kv affect visibility at tip.
@ -557,6 +594,9 @@ void CompactionIterator::PrepareOutput() {
// only care about sequence number larger than any active snapshots.
if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ &&
(snapshot_checker_ == nullptr ||
LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
earliest_snapshot_))) &&
ikey_.type != kTypeMerge &&
!cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) {
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
@ -568,10 +608,11 @@ void CompactionIterator::PrepareOutput() {
inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
SequenceNumber in, SequenceNumber* prev_snapshot) {
assert(snapshots_->size());
SequenceNumber prev __attribute__((__unused__)) = kMaxSequenceNumber;
SequenceNumber prev = kMaxSequenceNumber;
for (const auto cur : *snapshots_) {
assert(prev == kMaxSequenceNumber || prev <= cur);
if (cur >= in) {
if (cur >= in && (snapshot_checker_ == nullptr ||
snapshot_checker_->IsInSnapshot(in, cur))) {
*prev_snapshot = prev == kMaxSequenceNumber ? 0 : prev;
return cur;
}

View File

@ -14,6 +14,7 @@
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
#include "db/range_del_aggregator.h"
#include "db/snapshot_checker.h"
#include "options/cf_options.h"
#include "rocksdb/compaction_filter.h"
@ -59,7 +60,8 @@ class CompactionIterator {
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool expect_valid_internal_key,
RangeDelAggregator* range_del_agg,
const Compaction* compaction = nullptr,
@ -71,7 +73,8 @@ class CompactionIterator {
CompactionIterator(InternalIterator* input, const Comparator* cmp,
MergeHelper* merge_helper, SequenceNumber last_sequence,
std::vector<SequenceNumber>* snapshots,
SequenceNumber earliest_write_conflict_snapshot, Env* env,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool expect_valid_internal_key,
RangeDelAggregator* range_del_agg,
std::unique_ptr<CompactionProxy> compaction,
@ -111,6 +114,9 @@ class CompactionIterator {
// compression.
void PrepareOutput();
// Invoke compaction filter if needed.
void InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
// Given a sequence number, return the sequence number of the
// earliest snapshot that this sequence number is visible in.
// The snapshots themselves are arranged in ascending order of
@ -125,6 +131,7 @@ class CompactionIterator {
MergeHelper* merge_helper_;
const std::vector<SequenceNumber>* snapshots_;
const SequenceNumber earliest_write_conflict_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
bool expect_valid_internal_key_;
RangeDelAggregator* range_del_agg_;
@ -132,7 +139,7 @@ class CompactionIterator {
const CompactionFilter* compaction_filter_;
#ifndef ROCKSDB_LITE
CompactionEventListener* compaction_listener_;
#endif // ROCKSDB_LITE
#endif // !ROCKSDB_LITE
const std::atomic<bool>* shutting_down_;
bool bottommost_level_;
bool valid_ = false;
@ -189,6 +196,10 @@ class CompactionIterator {
std::vector<size_t> level_ptrs_;
CompactionIterationStats iter_stats_;
// Used to avoid purging uncommitted values. The application can specify
// uncommitted values by providing a SnapshotChecker object.
bool current_key_committed_;
bool IsShuttingDown() {
// This is a best-effort facility, so memory_order_relaxed is sufficient.
return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);

View File

@ -181,6 +181,8 @@ class CompactionIteratorTest : public testing::Test {
compaction_proxy_ = new FakeCompaction();
compaction.reset(compaction_proxy_);
}
// TODO(yiwu) add a mock snapshot checker and add test for it.
SnapshotChecker* snapshot_checker = nullptr;
merge_helper_.reset(new MergeHelper(Env::Default(), cmp_, merge_op, filter,
nullptr, false, 0, 0, nullptr,
@ -189,8 +191,9 @@ class CompactionIteratorTest : public testing::Test {
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(
iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
kMaxSequenceNumber, Env::Default(), false, range_del_agg_.get(),
std::move(compaction), filter, nullptr, &shutting_down_));
kMaxSequenceNumber, snapshot_checker, Env::Default(), false,
range_del_agg_.get(), std::move(compaction), filter, nullptr,
&shutting_down_));
}
void AddSnapshot(SequenceNumber snapshot) { snapshots_.push_back(snapshot); }

View File

@ -269,9 +269,9 @@ CompactionJob::CompactionJob(
InstrumentedMutex* db_mutex, Status* db_bg_error,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
CompactionJobStats* compaction_job_stats)
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
@ -290,6 +290,7 @@ CompactionJob::CompactionJob(
db_bg_error_(db_bg_error),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
table_cache_(std::move(table_cache)),
event_logger_(event_logger),
paranoid_file_checks_(paranoid_file_checks),
@ -760,9 +761,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
Status status;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_, env_, false,
range_del_agg.get(), sub_compact->compaction, compaction_filter,
comp_event_listener, shutting_down_));
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, false, range_del_agg.get(),
sub_compact->compaction, compaction_filter, comp_event_listener,
shutting_down_));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() &&

View File

@ -45,12 +45,13 @@
namespace rocksdb {
class Arena;
class MemTable;
class SnapshotChecker;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class Arena;
class CompactionJob {
public:
@ -63,6 +64,7 @@ class CompactionJob {
Status* db_bg_error,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker,
std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname,
@ -149,6 +151,8 @@ class CompactionJob {
// should make sure not to remove evidence that a write occurred.
SequenceNumber earliest_write_conflict_snapshot_;
const SnapshotChecker* const snapshot_checker_;
std::shared_ptr<Cache> table_cache_;
EventLogger* event_logger_;

View File

@ -250,11 +250,14 @@ class CompactionJobTest : public testing::Test {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
mutex_.Lock();
EventLogger event_logger(db_options_.info_log.get());
CompactionJob compaction_job(
0, &compaction, db_options_, env_options_, versions_.get(),
&shutting_down_, &log_buffer, nullptr, nullptr, nullptr, &mutex_,
&bg_error_, snapshots, earliest_write_conflict_snapshot, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_);
// TODO(yiwu) add a mock snapshot checker and add test for it.
SnapshotChecker* snapshot_checker = nullptr;
CompactionJob compaction_job(0, &compaction, db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer,
nullptr, nullptr, nullptr, &mutex_, &bg_error_,
snapshots, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger,
false, false, dbname_, &compaction_job_stats_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);

View File

@ -29,6 +29,7 @@
#include "db/internal_stats.h"
#include "db/log_writer.h"
#include "db/read_callback.h"
#include "db/snapshot_checker.h"
#include "db/snapshot_impl.h"
#include "db/version_edit.h"
#include "db/wal_manager.h"
@ -53,13 +54,13 @@
namespace rocksdb {
class Arena;
class ArenaWrappedDBIter;
class MemTable;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class Arena;
class WriteCallback;
struct JobContext;
struct ExternalSstFileInfo;
@ -573,6 +574,9 @@ class DBImpl : public DB {
void AddToLogsToFreeQueue(log::Writer* log_writer) {
logs_to_free_queue_.push_back(log_writer);
}
void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
InstrumentedMutex* mutex() { return &mutex_; }
Status NewDB();
@ -1231,6 +1235,10 @@ class DBImpl : public DB {
std::unordered_map<uint64_t, uint64_t> prepared_section_completed_;
std::mutex prep_heap_mutex_;
// Callback for compaction to check if a key is visible to a snapshot.
// REQUIRES: mutex held
std::unique_ptr<SnapshotChecker> snapshot_checker_;
// No copying allowed
DBImpl(const DBImpl&);
void operator=(const DBImpl&);

View File

@ -88,8 +88,8 @@ Status DBImpl::FlushMemTableToOutputFile(
FlushJob flush_job(
dbname_, cfd, immutable_db_options_, mutable_cf_options,
env_options_for_compaction_, versions_.get(), &mutex_,
&shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker_.get(),
job_context, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
@ -534,10 +534,10 @@ Status DBImpl::CompactFilesImpl(
assert(is_snapshot_supported_ || snapshots_.empty());
CompactionJob compaction_job(
job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_,
log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_,
snapshot_seqs, earliest_write_conflict_snapshot, table_cache_,
env_options_for_compaction_, versions_.get(), &shutting_down_, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(c->output_path_id()),
stats_, &mutex_, &bg_error_, snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker_.get(), table_cache_,
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
nullptr); // Here we pass a nullptr for CompactionJobStats because
@ -1684,8 +1684,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
env_options_for_compaction_, versions_.get(), &shutting_down_,
log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
&bg_error_, snapshot_seqs,
earliest_write_conflict_snapshot, table_cache_, &event_logger_,
&bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker_.get(), table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
&compaction_job_stats);
@ -1910,4 +1910,13 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
mutable_cf_options.write_buffer_size *
mutable_cf_options.max_write_buffer_number;
}
void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {
InstrumentedMutexLock l(&mutex_);
// snapshot_checker_ should only set once. If we need to set it multiple
// times, we need to make sure the old one is not deleted while it is still
// using by a compaction job.
assert(!snapshot_checker_);
snapshot_checker_.reset(snapshot_checker);
}
} // namespace rocksdb

View File

@ -883,6 +883,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
SequenceNumber earliest_write_conflict_snapshot;
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);
// Only TransactionDB passes snapshot_checker and it creates it after db
// open. Just pass nullptr here.
SnapshotChecker* snapshot_checker = nullptr;
s = BuildTable(
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
@ -890,7 +893,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
cfd->ioptions()->compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery,

View File

@ -62,8 +62,9 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
JobContext* job_context, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_file_directory,
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats)
: dbname_(dbname),
@ -76,6 +77,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
shutting_down_(shutting_down),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
job_context_(job_context),
log_buffer_(log_buffer),
db_directory_(db_directory),
@ -303,8 +305,8 @@ Status FlushJob::WriteLevel0Table() {
std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, output_compression_,
cfd_->ioptions()->compression_opts,
earliest_write_conflict_snapshot_, snapshot_checker_,
output_compression_, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time);

View File

@ -43,6 +43,7 @@
namespace rocksdb {
class MemTable;
class SnapshotChecker;
class TableCache;
class Version;
class VersionEdit;
@ -56,15 +57,14 @@ class FlushJob {
FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions env_options,
VersionSet* versions, InstrumentedMutex* db_mutex,
std::atomic<bool>* shutting_down,
const EnvOptions env_options, VersionSet* versions,
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
JobContext* job_context, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_file_directory,
CompressionType output_compression, Statistics* stats,
EventLogger* event_logger, bool measure_io_stats);
SnapshotChecker* snapshot_checker, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory,
Directory* output_file_directory, CompressionType output_compression,
Statistics* stats, EventLogger* event_logger, bool measure_io_stats);
~FlushJob();
@ -90,6 +90,7 @@ class FlushJob {
std::atomic<bool>* shutting_down_;
std::vector<SequenceNumber> existing_snapshots_;
SequenceNumber earliest_write_conflict_snapshot_;
SnapshotChecker* snapshot_checker_;
JobContext* job_context_;
LogBuffer* log_buffer_;
Directory* db_directory_;

View File

@ -92,12 +92,12 @@ TEST_F(FlushJobTest, Empty) {
JobContext job_context(0);
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
EventLogger event_logger(db_options_.info_log.get());
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_,
&shutting_down_, {}, kMaxSequenceNumber, &job_context,
nullptr, nullptr, nullptr, kNoCompression, nullptr,
&event_logger, false);
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_,
&shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context,
nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, false);
{
InstrumentedMutexLock l(&mutex_);
flush_job.PickMemTable();
@ -137,12 +137,12 @@ TEST_F(FlushJobTest, NonEmpty) {
}
EventLogger event_logger(db_options_.info_log.get());
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_,
&shutting_down_, {}, kMaxSequenceNumber, &job_context,
nullptr, nullptr, nullptr, kNoCompression, nullptr,
&event_logger, true);
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_,
&shutting_down_, {}, kMaxSequenceNumber, snapshot_checker, &job_context,
nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger, true);
FileMetaData fd;
mutex_.Lock();
flush_job.PickMemTable();
@ -204,12 +204,13 @@ TEST_F(FlushJobTest, Snapshots) {
}
EventLogger event_logger(db_options_.info_log.get());
FlushJob flush_job(
dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_,
*cfd->GetLatestMutableCFOptions(), env_options_,
versions_.get(), &mutex_, &shutting_down_, snapshots, kMaxSequenceNumber,
&job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr,
&event_logger, true);
SnapshotChecker* snapshot_checker = nullptr; // not relavant
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_,
snapshots, kMaxSequenceNumber, snapshot_checker,
&job_context, nullptr, nullptr, nullptr, kNoCompression,
nullptr, &event_logger, true);
mutex_.Lock();
flush_job.PickMemTable();
ASSERT_OK(flush_job.Run());

View File

@ -400,6 +400,10 @@ class Repairer {
int64_t _current_time = 0;
status = env_->GetCurrentTime(&_current_time); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time);
// Only TransactionDB make use of snapshot_checker and repair doesn't
// currently support TransactionDB with uncommitted prepared keys in WAL.
// TODO(yiwu) Support repairing TransactionDB.
SnapshotChecker* snapshot_checker = nullptr;
status = BuildTable(
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
@ -407,10 +411,11 @@ class Repairer {
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
{}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false,
nullptr /* internal_stats */, TableFileCreationReason::kRecovery,
nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
nullptr /* table_properties */, -1 /* level */, current_time);
{}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery, nullptr /* event_logger */,
0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),

28
db/snapshot_checker.h Normal file
View File

@ -0,0 +1,28 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/types.h"
namespace rocksdb {
class WritePreparedTxnDB;
// Callback class created by WritePreparedTxnDB to check if a key
// is visible by a snapshot.
class SnapshotChecker {
public:
explicit SnapshotChecker(WritePreparedTxnDB* txn_db);
bool IsInSnapshot(SequenceNumber sequence,
SequenceNumber snapshot_sequence) const;
private:
#ifndef ROCKSDB_LITE
const WritePreparedTxnDB* const txn_db_;
#endif // !ROCKSDB_LITE
};
} // namespace rocksdb

11
src.mk
View File

@ -166,7 +166,7 @@ LIB_SOURCES = \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \
utilities/date_tiered/date_tiered_db_impl.cc \
utilities/debug.cc \
utilities/debug.cc \
utilities/document/document_db.cc \
utilities/document/json_document.cc \
utilities/document/json_document_builder.cc \
@ -193,14 +193,15 @@ LIB_SOURCES = \
utilities/spatialdb/spatial_db.cc \
utilities/table_properties_collectors/compact_on_deletion_collector.cc \
utilities/transactions/optimistic_transaction_db_impl.cc \
utilities/transactions/optimistic_transaction.cc \
utilities/transactions/optimistic_transaction.cc \
utilities/transactions/pessimistic_transaction.cc \
utilities/transactions/pessimistic_transaction_db.cc \
utilities/transactions/snapshot_checker.cc \
utilities/transactions/transaction_base.cc \
utilities/transactions/pessimistic_transaction_db.cc \
utilities/transactions/transaction_db_mutex_impl.cc \
utilities/transactions/pessimistic_transaction.cc \
utilities/transactions/transaction_lock_mgr.cc \
utilities/transactions/transaction_util.cc \
utilities/transactions/write_prepared_txn.cc \
utilities/transactions/write_prepared_txn.cc \
utilities/ttl/db_ttl_impl.cc \
utilities/write_batch_with_index/write_batch_with_index.cc \
utilities/write_batch_with_index/write_batch_with_index_internal.cc \

View File

@ -147,6 +147,9 @@ Status WritePreparedTxnDB::Initialize(
SequenceNumber prev_max = max_evicted_seq_;
SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
AdvanceMaxEvictedSeq(prev_max, last_seq);
db_impl_->SetSnapshotChecker(new SnapshotChecker(this));
auto s = PessimisticTransactionDB::Initialize(compaction_enabled_cf_indices,
handles);
return s;
@ -573,7 +576,11 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options,
const Slice& key, PinnableSlice* value) {
// We are fine with the latest committed value. This could be done by
// specifying the snapshot as kMaxSequenceNumber.
WritePreparedTxnReadCallback callback(this, kMaxSequenceNumber);
SequenceNumber seq = kMaxSequenceNumber;
if (options.snapshot != nullptr) {
seq = options.snapshot->GetSequenceNumber();
}
WritePreparedTxnReadCallback callback(this, seq);
bool* dont_care = nullptr;
// Note: no need to specify a snapshot for read options as no specific
// snapshot is requested by the user.
@ -581,9 +588,20 @@ Status WritePreparedTxnDB::Get(const ReadOptions& options,
&callback);
}
void WritePreparedTxnDB::Init(const TransactionDBOptions& /* unused */) {
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps
// around.
INC_STEP_FOR_MAX_EVICTED =
std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast<size_t>(1));
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>(
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
}
// Returns true if commit_seq <= snapshot_seq
bool WritePreparedTxnDB::IsInSnapshot(uint64_t prep_seq,
uint64_t snapshot_seq) {
uint64_t snapshot_seq) const {
// Here we try to infer the return value without looking into prepare list.
// This would help avoiding synchronization over a shared map.
// TODO(myabandeh): read your own writes
@ -734,7 +752,7 @@ void WritePreparedTxnDB::AddCommitted(uint64_t prepare_seq,
bool WritePreparedTxnDB::GetCommitEntry(const uint64_t indexed_seq,
CommitEntry64b* entry_64b,
CommitEntry* entry) {
CommitEntry* entry) const {
*entry_64b = commit_cache_[indexed_seq].load(std::memory_order_acquire);
bool valid = entry_64b->Parse(indexed_seq, entry, FORMAT);
return valid;

View File

@ -14,6 +14,7 @@
#include <vector>
#include "db/read_callback.h"
#include "db/snapshot_checker.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/transaction_db.h"
@ -174,7 +175,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
COMMIT_CACHE_BITS(commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
init(txn_db_options);
Init(txn_db_options);
}
explicit WritePreparedTxnDB(
@ -187,7 +188,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
COMMIT_CACHE_BITS(commit_cache_bits),
COMMIT_CACHE_SIZE(static_cast<size_t>(1ull << COMMIT_CACHE_BITS)),
FORMAT(COMMIT_CACHE_BITS) {
init(txn_db_options);
Init(txn_db_options);
}
virtual ~WritePreparedTxnDB() {}
@ -207,7 +208,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Check whether the transaction that wrote the value with seqeunce number seq
// is visible to the snapshot with sequence number snapshot_seq
bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq);
bool IsInSnapshot(uint64_t seq, uint64_t snapshot_seq) const;
// Add the trasnaction with prepare sequence seq to the prepared list
void AddPrepared(uint64_t seq);
// Rollback a prepared txn identified with prep_seq. rollback_seq is the seq
@ -312,16 +313,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
friend class WritePreparedTransactionTest_IsInSnapshotEmptyMapTest_Test;
friend class WritePreparedTransactionTest_RollbackTest_Test;
void init(const TransactionDBOptions& /* unused */) {
// Adcance max_evicted_seq_ no more than 100 times before the cache wraps
// around.
INC_STEP_FOR_MAX_EVICTED =
std::max(SNAPSHOT_CACHE_SIZE / 100, static_cast<size_t>(1));
snapshot_cache_ = unique_ptr<std::atomic<SequenceNumber>[]>(
new std::atomic<SequenceNumber>[SNAPSHOT_CACHE_SIZE] {});
commit_cache_ = unique_ptr<std::atomic<CommitEntry64b>[]>(
new std::atomic<CommitEntry64b>[COMMIT_CACHE_SIZE] {});
}
void Init(const TransactionDBOptions& /* unused */);
// A heap with the amortized O(1) complexity for erase. It uses one extra heap
// to keep track of erased entries that are not yet on top of the main heap.
@ -363,7 +355,7 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
// Get the commit entry with index indexed_seq from the commit table. It
// returns true if such entry exists.
bool GetCommitEntry(const uint64_t indexed_seq, CommitEntry64b* entry_64b,
CommitEntry* entry);
CommitEntry* entry) const;
// Rewrite the entry with the index indexed_seq in the commit table with the
// commit entry <prep_seq, commit_seq>. If the rewrite results into eviction,
@ -467,10 +459,10 @@ class WritePreparedTxnDB : public PessimisticTransactionDB {
std::atomic<bool> delayed_prepared_empty_ = {true};
// Update when old_commit_map_.empty() changes. Expected to be true normally.
std::atomic<bool> old_commit_map_empty_ = {true};
port::RWMutex prepared_mutex_;
port::RWMutex old_commit_map_mutex_;
port::RWMutex commit_cache_mutex_;
port::RWMutex snapshots_mutex_;
mutable port::RWMutex prepared_mutex_;
mutable port::RWMutex old_commit_map_mutex_;
mutable port::RWMutex commit_cache_mutex_;
mutable port::RWMutex snapshots_mutex_;
};
class WritePreparedTxnReadCallback : public ReadCallback {

View File

@ -0,0 +1,37 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/snapshot_checker.h"
#ifdef ROCKSDB_LITE
#include <assert.h>
#endif // ROCKSDB_LITE
#include "utilities/transactions/pessimistic_transaction_db.h"
namespace rocksdb {
#ifdef ROCKSDB_LITE
SnapshotChecker::SnapshotChecker(WritePreparedTxnDB* txn_db) {}
bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence,
SequenceNumber snapshot_sequence) const {
// Should never be called in LITE mode.
assert(false);
return true;
}
#else
SnapshotChecker::SnapshotChecker(WritePreparedTxnDB* txn_db)
: txn_db_(txn_db){};
bool SnapshotChecker::IsInSnapshot(SequenceNumber sequence,
SequenceNumber snapshot_sequence) const {
return txn_db_->IsInSnapshot(sequence, snapshot_sequence);
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

View File

@ -18,8 +18,10 @@
#include <thread>
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/types.h"
#include "rocksdb/utilities/debug.h"
#include "rocksdb/utilities/transaction.h"
#include "rocksdb/utilities/transaction_db.h"
@ -602,6 +604,9 @@ TEST_P(WritePreparedTransactionTest, SeqAdvanceTest) {
WriteOptions wopts;
FlushOptions fopt;
options.disable_auto_compactions = true;
ReOpen();
// Do the test with NUM_BRANCHES branches in it. Each run of a test takes some
// of the branches. This is the same as counting a binary number where i-th
// bit represents whether we take branch i in the represented by the number.
@ -1363,6 +1368,274 @@ TEST_P(WritePreparedTransactionTest, SequenceNumberZeroTest) {
db->ReleaseSnapshot(snapshot);
}
// Compaction should not remove a key if it is not committed, and should
// proceed with older versions of the key as-if the new version doesn't exist.
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
options.disable_auto_compactions = true;
ReOpen();
// Snapshots to avoid keys get evicted.
std::vector<const Snapshot*> snapshots;
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto add_key = [&](std::function<Status()> func) {
ASSERT_OK(func());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
snapshots.push_back(db->GetSnapshot());
};
// Each key here represent a standalone test case.
add_key([&]() { return db->Put(WriteOptions(), "key1", "value1_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key2", "value2_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key3", "value3_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key4", "value4_1"); });
add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_1"); });
add_key([&]() { return db->Merge(WriteOptions(), "key5", "value5_2"); });
add_key([&]() { return db->Put(WriteOptions(), "key6", "value6_1"); });
add_key([&]() { return db->Put(WriteOptions(), "key7", "value7_1"); });
ASSERT_OK(db->Flush(FlushOptions()));
add_key([&]() { return db->Delete(WriteOptions(), "key6"); });
add_key([&]() { return db->SingleDelete(WriteOptions(), "key7"); });
auto* transaction = db->BeginTransaction(WriteOptions());
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("key1", "value1_2"));
ASSERT_OK(transaction->Delete("key2"));
ASSERT_OK(transaction->SingleDelete("key3"));
ASSERT_OK(transaction->Merge("key4", "value4_2"));
ASSERT_OK(transaction->Merge("key5", "value5_3"));
ASSERT_OK(transaction->Put("key6", "value6_2"));
ASSERT_OK(transaction->Put("key7", "value7_2"));
// Prepare but not commit.
ASSERT_OK(transaction->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
ASSERT_OK(db->Flush(FlushOptions()));
for (auto* s : snapshots) {
db->ReleaseSnapshot(s);
}
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyKeys({
{"key1", "value1_1"},
{"key2", "value2_1"},
{"key3", "value3_1"},
{"key4", "value4_1"},
{"key5", "value5_1,value5_2"},
{"key6", "NOT_FOUND"},
{"key7", "NOT_FOUND"},
});
VerifyInternalKeys({
{"key1", "value1_2", expected_seq, kTypeValue},
{"key1", "value1_1", 0, kTypeValue},
{"key2", "", expected_seq, kTypeDeletion},
{"key2", "value2_1", 0, kTypeValue},
{"key3", "", expected_seq, kTypeSingleDeletion},
{"key3", "value3_1", 0, kTypeValue},
{"key4", "value4_2", expected_seq, kTypeMerge},
{"key4", "value4_1", 0, kTypeValue},
{"key5", "value5_3", expected_seq, kTypeMerge},
{"key5", "value5_1,value5_2", 0, kTypeValue},
{"key6", "value6_2", expected_seq, kTypeValue},
{"key7", "value7_2", expected_seq, kTypeValue},
});
ASSERT_OK(transaction->Commit());
VerifyKeys({
{"key1", "value1_2"},
{"key2", "NOT_FOUND"},
{"key3", "NOT_FOUND"},
{"key4", "value4_1,value4_2"},
{"key5", "value5_1,value5_2,value5_3"},
{"key6", "value6_2"},
{"key7", "value7_2"},
});
delete transaction;
}
// Compaction should keep keys visible to a snapshot based on commit sequence,
// not just prepare sequence.
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
options.disable_auto_compactions = true;
ReOpen();
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto* txn1 = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn1->SetName("txn1"));
ASSERT_OK(txn1->Put("key1", "value1_1"));
ASSERT_OK(txn1->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
ASSERT_OK(txn1->Commit());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
delete txn1;
// Take a snapshots to avoid keys get evicted before compaction.
const Snapshot* snapshot1 = db->GetSnapshot();
auto* txn2 = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn2->SetName("txn2"));
ASSERT_OK(txn2->Put("key2", "value2_1"));
ASSERT_OK(txn2->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
// txn1 commit before snapshot2 and it is visible to snapshot2.
// txn2 commit after snapshot2 and it is not visible.
const Snapshot* snapshot2 = db->GetSnapshot();
ASSERT_OK(txn2->Commit());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
delete txn2;
// Take a snapshots to avoid keys get evicted before compaction.
const Snapshot* snapshot3 = db->GetSnapshot();
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_2"));
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
SequenceNumber seq1 = expected_seq;
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2_2"));
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
SequenceNumber seq2 = expected_seq;
ASSERT_OK(db->Flush(FlushOptions()));
db->ReleaseSnapshot(snapshot1);
db->ReleaseSnapshot(snapshot3);
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyKeys({{"key1", "value1_2"}, {"key2", "value2_2"}});
VerifyKeys({{"key1", "value1_1"}, {"key2", "NOT_FOUND"}}, snapshot2);
VerifyInternalKeys({
{"key1", "value1_2", seq1, kTypeValue},
// "value1_1" is visible to snapshot2. Also keys at bottom level visible
// to earliest snapshot will output with seq = 0.
{"key1", "value1_1", 0, kTypeValue},
{"key2", "value2_2", seq2, kTypeValue},
});
db->ReleaseSnapshot(snapshot2);
}
// A more complex test to verify compaction/flush should keep keys visible
// to snapshots.
TEST_P(WritePreparedTransactionTest,
DISABLED_CompactionShouldKeepSnapshotVisibleKeysRandomized) {
constexpr size_t kNumTransactions = 10;
constexpr size_t kNumIterations = 1000;
std::vector<Transaction*> transactions(kNumTransactions, nullptr);
std::vector<size_t> versions(kNumTransactions, 0);
std::unordered_map<std::string, std::string> current_data;
std::vector<const Snapshot*> snapshots;
std::vector<std::unordered_map<std::string, std::string>> snapshot_data;
Random rnd(1103);
options.disable_auto_compactions = true;
ReOpen();
for (size_t i = 0; i < kNumTransactions; i++) {
std::string key = "key" + ToString(i);
std::string value = "value0";
ASSERT_OK(db->Put(WriteOptions(), key, value));
current_data[key] = value;
}
VerifyKeys(current_data);
for (size_t iter = 0; iter < kNumIterations; iter++) {
auto r = rnd.Next() % (kNumTransactions + 1);
if (r < kNumTransactions) {
std::string key = "key" + ToString(r);
if (transactions[r] == nullptr) {
std::string value = "value" + ToString(versions[r] + 1);
auto* txn = db->BeginTransaction(WriteOptions());
ASSERT_OK(txn->SetName("txn" + ToString(r)));
ASSERT_OK(txn->Put(key, value));
ASSERT_OK(txn->Prepare());
transactions[r] = txn;
} else {
std::string value = "value" + ToString(++versions[r]);
ASSERT_OK(transactions[r]->Commit());
delete transactions[r];
transactions[r] = nullptr;
current_data[key] = value;
}
} else {
auto* snapshot = db->GetSnapshot();
VerifyKeys(current_data, snapshot);
snapshots.push_back(snapshot);
snapshot_data.push_back(current_data);
}
VerifyKeys(current_data);
}
// Take a last snapshot to test compaction with uncommitted prepared
// transaction.
snapshots.push_back(db->GetSnapshot());
snapshot_data.push_back(current_data);
assert(snapshots.size() == snapshot_data.size());
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
ASSERT_OK(db->Flush(FlushOptions()));
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
// cleanup
for (size_t i = 0; i < kNumTransactions; i++) {
if (transactions[i] == nullptr) {
continue;
}
ASSERT_OK(transactions[i]->Commit());
delete transactions[i];
}
for (size_t i = 0; i < snapshots.size(); i++) {
db->ReleaseSnapshot(snapshots[i]);
}
}
// Compaction should not apply the optimization to output key with sequence
// number equal to 0 if the key is not visible to earliest snapshot, based on
// commit sequence number.
TEST_P(WritePreparedTransactionTest,
CompactionShouldKeepSequenceForUncommittedKeys) {
options.disable_auto_compactions = true;
ReOpen();
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto* transaction = db->BeginTransaction(WriteOptions());
ASSERT_OK(transaction->SetName("txn"));
ASSERT_OK(transaction->Put("key1", "value1"));
ASSERT_OK(transaction->Prepare());
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
SequenceNumber seq1 = expected_seq;
ASSERT_OK(db->Put(WriteOptions(), "key2", "value2"));
ASSERT_EQ(++expected_seq, db->GetLatestSequenceNumber());
ASSERT_OK(db->Flush(FlushOptions()));
// Dummy keys to avoid compaction trivially move files and get around actual
// compaction logic.
ASSERT_OK(db->Put(WriteOptions(), "a", "dummy"));
ASSERT_OK(db->Put(WriteOptions(), "z", "dummy"));
ASSERT_OK(db->CompactRange(CompactRangeOptions(), nullptr, nullptr));
VerifyKeys({
{"key1", "NOT_FOUND"},
{"key2", "value2"},
});
VerifyInternalKeys({
// "key1" has not been committed. It keeps its sequence number.
{"key1", "value1", seq1, kTypeValue},
// "key2" is committed and output with seq = 0.
{"key2", "value2", 0, kTypeValue},
});
ASSERT_OK(transaction->Commit());
VerifyKeys({
{"key1", "value1"},
{"key2", "value2"},
});
delete transaction;
}
} // namespace rocksdb
int main(int argc, char** argv) {