Update protection info on recovered logs data (#9875)

Summary:
Update protection info on recovered logs data

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9875

Test Plan:
- Benchmark setup: `TEST_TMPDIR=/dev/shm/100MB_WAL_DB/ ./db_bench -benchmarks=fillrandom -write_buffer_size=1048576000`
- Benchmark command: `TEST_TMPDIR=/dev/shm/100MB_WAL_DB/ /usr/bin/time ./db_bench -use_existing_db=true -benchmarks=overwrite -write_buffer_size=1048576000 -writes=1 -report_open_timing=true`
- Results before this PR
```
OpenDb:     2350.14 milliseconds
OpenDb:     2296.94 milliseconds
OpenDb:     2184.29 milliseconds
OpenDb:     2167.59 milliseconds
OpenDb:     2231.24 milliseconds
OpenDb:     2109.57 milliseconds
OpenDb:     2197.71 milliseconds
OpenDb:     2120.8 milliseconds
OpenDb:     2148.12 milliseconds
OpenDb:     2207.95 milliseconds
```
- Results after this PR
```
OpenDb:     2424.52 milliseconds
OpenDb:     2359.84 milliseconds
OpenDb:     2317.68 milliseconds
OpenDb:     2339.4 milliseconds
OpenDb:     2325.36 milliseconds
OpenDb:     2321.06 milliseconds
OpenDb:     2353.98 milliseconds
OpenDb:     2344.64 milliseconds
OpenDb:     2384.09 milliseconds
OpenDb:     2428.58 milliseconds
```

Mean regressed 7.2% (2201.4 -> 2359.9)

Reviewed By: ajkr

Differential Revision: D36012787

Pulled By: akomurav

fbshipit-source-id: d2aba09f29c6beb2fd0fe8e1e359be910b4ef02a
This commit is contained in:
Anvesh Komuravelli 2022-04-28 14:42:00 -07:00 committed by Facebook GitHub Bot
parent fce65e7e4f
commit aafb377bb5
6 changed files with 204 additions and 31 deletions

View File

@ -947,7 +947,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
/*arg=*/nullptr);
@ -961,10 +960,15 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
continue;
}
// We create a new batch and initialize with a valid prot_info_ to store
// the data checksums
WriteBatch batch(0, 0, 8, 0);
status = WriteBatchInternal::SetContents(&batch, record);
if (!status.ok()) {
return status;
}
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
if (immutable_db_options_.wal_recovery_mode ==

View File

@ -152,14 +152,6 @@ struct SavePoints {
std::stack<SavePoint, autovector<SavePoint>> stack;
};
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
: content_flags_(0), max_bytes_(max_bytes), rep_() {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader)
? reserved_bytes
: WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes,
size_t protection_bytes_per_key, size_t default_cf_ts_sz)
: content_flags_(0),
@ -580,14 +572,16 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false;
if (!handler->WriteAfterCommit()) {
if (handler->WriteAfterCommit() ==
WriteBatch::Handler::OptionState::kDisabled) {
s = Status::NotSupported(
"WriteCommitted txn tag when write_after_commit_ is disabled (in "
"WritePrepared/WriteUnprepared mode). If it is not due to "
"corruption, the WAL must be emptied before changing the "
"WritePolicy.");
}
if (handler->WriteBeforePrepare()) {
if (handler->WriteBeforePrepare() ==
WriteBatch::Handler::OptionState::kEnabled) {
s = Status::NotSupported(
"WriteCommitted txn tag when write_before_prepare_ is enabled "
"(in WriteUnprepared mode). If it is not due to corruption, the "
@ -600,7 +594,8 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
s = handler->MarkBeginPrepare();
assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
if (handler->WriteAfterCommit() ==
WriteBatch::Handler::OptionState::kEnabled) {
s = Status::NotSupported(
"WritePrepared/WriteUnprepared txn tag when write_after_commit_ "
"is enabled (in default WriteCommitted mode). If it is not due "
@ -614,13 +609,15 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
s = handler->MarkBeginPrepare(true /* unprepared */);
assert(s.ok());
empty_batch = false;
if (handler->WriteAfterCommit()) {
if (handler->WriteAfterCommit() ==
WriteBatch::Handler::OptionState::kEnabled) {
s = Status::NotSupported(
"WriteUnprepared txn tag when write_after_commit_ is enabled (in "
"default WriteCommitted mode). If it is not due to corruption, "
"the WAL must be emptied before changing the WritePolicy.");
}
if (!handler->WriteBeforePrepare()) {
if (handler->WriteBeforePrepare() ==
WriteBatch::Handler::OptionState::kDisabled) {
s = Status::NotSupported(
"WriteUnprepared txn tag when write_before_prepare_ is disabled "
"(in WriteCommitted/WritePrepared mode). If it is not due to "
@ -1494,6 +1491,8 @@ Status WriteBatch::UpdateTimestamps(
return s;
}
namespace {
class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_;
@ -1581,9 +1580,24 @@ class MemTableInserter : public WriteBatch::Handler {
return res;
}
void DecrementProtectionInfoIdxForTryAgain() {
if (prot_info_ != nullptr) --prot_info_idx_;
}
void ResetProtectionInfo() {
prot_info_idx_ = 0;
prot_info_ = nullptr;
}
protected:
bool WriteBeforePrepare() const override { return write_before_prepare_; }
bool WriteAfterCommit() const override { return write_after_commit_; }
Handler::OptionState WriteBeforePrepare() const override {
return write_before_prepare_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}
Handler::OptionState WriteAfterCommit() const override {
return write_after_commit_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}
public:
// cf_mems should not be shared with concurrent inserters
@ -1871,15 +1885,25 @@ class MemTableInserter : public WriteBatch::Handler {
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo();
Status ret_status;
if (kv_prot_info != nullptr) {
// Memtable needs seqno, doesn't need CF ID
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
return PutCFImpl(column_family_id, key, value, kTypeValue,
&mem_kv_prot_info);
ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
&mem_kv_prot_info);
} else {
ret_status = PutCFImpl(column_family_id, key, value, kTypeValue,
nullptr /* kv_prot_info */);
}
return PutCFImpl(column_family_id, key, value, kTypeValue,
nullptr /* kv_prot_info */);
// TODO: this assumes that if TryAgain status is returned to the caller,
// the operation is actually tried again. The proper way to do this is to
// pass a `try_again` parameter to the operation itself and decrement
// prot_info_idx_ based on that
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
@ -1926,6 +1950,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
@ -1957,6 +1984,9 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status =
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
@ -1985,6 +2015,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
assert(ret_status.ok());
@ -2009,6 +2042,9 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = WriteBatchInternal::SingleDelete(rebuilding_trx_,
column_family_id, key);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
@ -2038,6 +2074,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
assert(ret_status.ok());
@ -2092,6 +2131,9 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = WriteBatchInternal::DeleteRange(
rebuilding_trx_, column_family_id, begin_key, end_key);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
@ -2121,6 +2163,9 @@ class MemTableInserter : public WriteBatch::Handler {
} else if (ret_status.ok()) {
MaybeAdvanceSeq(false /* batch_boundary */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
assert(ret_status.ok());
@ -2242,23 +2287,31 @@ class MemTableInserter : public WriteBatch::Handler {
ret_status = WriteBatchInternal::Merge(rebuilding_trx_, column_family_id,
key, value);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
Status PutBlobIndexCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo();
Status ret_status;
if (kv_prot_info != nullptr) {
// Memtable needs seqno, doesn't need CF ID
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
// Same as PutCF except for value type.
return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
&mem_kv_prot_info);
ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
&mem_kv_prot_info);
} else {
return PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
nullptr /* kv_prot_info */);
ret_status = PutCFImpl(column_family_id, key, value, kTypeBlobIndex,
nullptr /* kv_prot_info */);
}
if (UNLIKELY(ret_status.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return ret_status;
}
void CheckMemtableFull() {
@ -2401,6 +2454,7 @@ class MemTableInserter : public WriteBatch::Handler {
const auto& batch_info = trx->batches_.begin()->second;
// all inserts must reference this trx log number
log_number_ref_ = batch_info.log_number_;
ResetProtectionInfo();
s = batch_info.batch_->Iterate(this);
log_number_ref_ = 0;
}
@ -2422,6 +2476,10 @@ class MemTableInserter : public WriteBatch::Handler {
const bool batch_boundry = true;
MaybeAdvanceSeq(batch_boundry);
if (UNLIKELY(s.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return s;
}
@ -2466,6 +2524,7 @@ class MemTableInserter : public WriteBatch::Handler {
return ucmp->timestamp_size();
});
if (s.ok()) {
ResetProtectionInfo();
s = batch_info.batch_->Iterate(this);
log_number_ref_ = 0;
}
@ -2488,6 +2547,10 @@ class MemTableInserter : public WriteBatch::Handler {
constexpr bool batch_boundary = true;
MaybeAdvanceSeq(batch_boundary);
if (UNLIKELY(s.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return s;
}
@ -2523,6 +2586,8 @@ class MemTableInserter : public WriteBatch::Handler {
}
};
} // namespace
// This function can only be called in these conditions:
// 1) During Recovery()
// 2) During Write(), in a single-threaded write thread
@ -2613,11 +2678,94 @@ Status WriteBatchInternal::InsertInto(
return s;
}
namespace {
// This class updates protection info for a WriteBatch.
class ProtectionInfoUpdater : public WriteBatch::Handler {
public:
explicit ProtectionInfoUpdater(WriteBatch::ProtectionInfo* prot_info)
: prot_info_(prot_info) {}
~ProtectionInfoUpdater() override {}
Status PutCF(uint32_t cf, const Slice& key, const Slice& val) override {
return UpdateProtInfo(cf, key, val, kTypeValue);
}
Status DeleteCF(uint32_t cf, const Slice& key) override {
return UpdateProtInfo(cf, key, "", kTypeDeletion);
}
Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
return UpdateProtInfo(cf, key, "", kTypeSingleDeletion);
}
Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
const Slice& end_key) override {
return UpdateProtInfo(cf, begin_key, end_key, kTypeRangeDeletion);
}
Status MergeCF(uint32_t cf, const Slice& key, const Slice& val) override {
return UpdateProtInfo(cf, key, val, kTypeMerge);
}
Status PutBlobIndexCF(uint32_t cf, const Slice& key,
const Slice& val) override {
return UpdateProtInfo(cf, key, val, kTypeBlobIndex);
}
Status MarkBeginPrepare(bool /* unprepare */) override {
return Status::OK();
}
Status MarkEndPrepare(const Slice& /* xid */) override {
return Status::OK();
}
Status MarkCommit(const Slice& /* xid */) override { return Status::OK(); }
Status MarkCommitWithTimestamp(const Slice& /* xid */,
const Slice& /* ts */) override {
return Status::OK();
}
Status MarkRollback(const Slice& /* xid */) override { return Status::OK(); }
Status MarkNoop(bool /* empty_batch */) override { return Status::OK(); }
private:
Status UpdateProtInfo(uint32_t cf, const Slice& key, const Slice& val,
const ValueType op_type) {
if (prot_info_) {
prot_info_->entries_.emplace_back(
ProtectionInfo64().ProtectKVO(key, val, op_type).ProtectC(cf));
}
return Status::OK();
}
// No copy or move.
ProtectionInfoUpdater(const ProtectionInfoUpdater&) = delete;
ProtectionInfoUpdater(ProtectionInfoUpdater&&) = delete;
ProtectionInfoUpdater& operator=(const ProtectionInfoUpdater&) = delete;
ProtectionInfoUpdater& operator=(ProtectionInfoUpdater&&) = delete;
WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
};
} // namespace
Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= WriteBatchInternal::kHeader);
assert(b->prot_info_ == nullptr);
b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
// If we have a prot_info_, update protection info entries for the batch.
if (b->prot_info_) {
ProtectionInfoUpdater prot_info_updater(b->prot_info_.get());
return b->Iterate(&prot_info_updater);
}
return Status::OK();
}

View File

@ -63,7 +63,9 @@ struct SavePoint {
class WriteBatch : public WriteBatchBase {
public:
explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0);
explicit WriteBatch(size_t reserved_bytes = 0, size_t max_bytes = 0)
: WriteBatch(reserved_bytes, max_bytes, 0, 0) {}
// `protection_bytes_per_key` is the number of bytes used to store
// protection information for each key entry. Currently supported values are
// zero (disabled) and eight.
@ -318,8 +320,17 @@ class WriteBatch : public WriteBatchBase {
protected:
friend class WriteBatchInternal;
virtual bool WriteAfterCommit() const { return true; }
virtual bool WriteBeforePrepare() const { return false; }
enum class OptionState {
kUnknown,
kDisabled,
kEnabled,
};
virtual OptionState WriteAfterCommit() const {
return OptionState::kUnknown;
}
virtual OptionState WriteBeforePrepare() const {
return OptionState::kUnknown;
}
};
Status Iterate(Handler* handler) const;
@ -402,6 +413,9 @@ class WriteBatch : public WriteBatchBase {
struct ProtectionInfo;
size_t GetProtectionBytesPerKey() const;
// Clears prot_info_ if there are no entries.
void ClearProtectionInfoIfEmpty();
private:
friend class WriteBatchInternal;
friend class LocalSavePoint;

View File

@ -2538,7 +2538,10 @@ class InMemoryHandler : public WriteBatch::Handler {
~InMemoryHandler() override {}
protected:
bool WriteAfterCommit() const override { return write_after_commit_; }
Handler::OptionState WriteAfterCommit() const override {
return write_after_commit_ ? Handler::OptionState::kEnabled
: Handler::OptionState::kDisabled;
}
private:
std::stringstream& row_;

View File

@ -374,7 +374,9 @@ Status WritePreparedTxn::RollbackInternal() {
}
protected:
bool WriteAfterCommit() const override { return false; }
Handler::OptionState WriteAfterCommit() const override {
return Handler::OptionState::kDisabled;
}
} rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
*cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
wpt_db_->txn_db_options_.rollback_merge_operands,

View File

@ -1082,7 +1082,9 @@ struct SubBatchCounter : public WriteBatch::Handler {
}
Status MarkBeginPrepare(bool) override { return Status::OK(); }
Status MarkRollback(const Slice&) override { return Status::OK(); }
bool WriteAfterCommit() const override { return false; }
Handler::OptionState WriteAfterCommit() const override {
return Handler::OptionState::kDisabled;
}
};
SnapshotBackup WritePreparedTxnDB::AssignMinMaxSeqs(const Snapshot* snapshot,