rocksdb/db/write_batch.cc
Yi Wu 07bdcb91fe New WriteImpl to pipeline WAL/memtable write
Summary:
PipelineWriteImpl is an alternative approach to WriteImpl. In WriteImpl, only one thread is allow to write at the same time. This thread will do both WAL and memtable writes for all write threads in the write group. Pending writers wait in queue until the current writer finishes. In the pipeline write approach, two queue is maintained: one WAL writer queue and one memtable writer queue. All writers (regardless of whether they need to write WAL) will still need to first join the WAL writer queue, and after the house keeping work and WAL writing, they will need to join memtable writer queue if needed. The benefit of this approach is that
1. Writers without memtable writes (e.g. the prepare phase of two phase commit) can exit write thread once WAL write is finish. They don't need to wait for memtable writes in case of group commit.
2. Pending writers only need to wait for previous WAL writer finish to be able to join the write thread, instead of wait also for previous memtable writes.

Merging #2056 and #2058 into this PR.
Closes https://github.com/facebook/rocksdb/pull/2286

Differential Revision: D5054606

Pulled By: yiwu-arbug

fbshipit-source-id: ee5b11efd19d3e39d6b7210937b11cefdd4d1c8d
2017-05-19 14:26:42 -07:00

1398 lines
47 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// WriteBatch::rep_ :=
// sequence: fixed64
// count: fixed32
// data: record[count]
// record :=
// kTypeValue varstring varstring
// kTypeDeletion varstring
// kTypeSingleDeletion varstring
// kTypeMerge varstring varstring
// kTypeColumnFamilyValue varint32 varstring varstring
// kTypeColumnFamilyDeletion varint32 varstring varstring
// kTypeColumnFamilySingleDeletion varint32 varstring varstring
// kTypeColumnFamilyMerge varint32 varstring varstring
// kTypeBeginPrepareXID varstring
// kTypeEndPrepareXID
// kTypeCommitXID varstring
// kTypeRollbackXID varstring
// kTypeNoop
// varstring :=
// len: varint32
// data: uint8[len]
#include "rocksdb/write_batch.h"
#include <map>
#include <stack>
#include <stdexcept>
#include <type_traits>
#include <vector>
#include "db/column_family.h"
#include "db/db_impl.h"
#include "db/dbformat.h"
#include "db/flush_scheduler.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/snapshot_impl.h"
#include "db/write_batch_internal.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
#include "rocksdb/merge_operator.h"
#include "util/coding.h"
#include "util/string_util.h"
namespace rocksdb {
// anon namespace for file-local types
namespace {
enum ContentFlags : uint32_t {
DEFERRED = 1 << 0,
HAS_PUT = 1 << 1,
HAS_DELETE = 1 << 2,
HAS_SINGLE_DELETE = 1 << 3,
HAS_MERGE = 1 << 4,
HAS_BEGIN_PREPARE = 1 << 5,
HAS_END_PREPARE = 1 << 6,
HAS_COMMIT = 1 << 7,
HAS_ROLLBACK = 1 << 8,
HAS_DELETE_RANGE = 1 << 9,
};
struct BatchContentClassifier : public WriteBatch::Handler {
uint32_t content_flags = 0;
Status PutCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_PUT;
return Status::OK();
}
Status DeleteCF(uint32_t, const Slice&) override {
content_flags |= ContentFlags::HAS_DELETE;
return Status::OK();
}
Status SingleDeleteCF(uint32_t, const Slice&) override {
content_flags |= ContentFlags::HAS_SINGLE_DELETE;
return Status::OK();
}
Status DeleteRangeCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_DELETE_RANGE;
return Status::OK();
}
Status MergeCF(uint32_t, const Slice&, const Slice&) override {
content_flags |= ContentFlags::HAS_MERGE;
return Status::OK();
}
Status MarkBeginPrepare() override {
content_flags |= ContentFlags::HAS_BEGIN_PREPARE;
return Status::OK();
}
Status MarkEndPrepare(const Slice&) override {
content_flags |= ContentFlags::HAS_END_PREPARE;
return Status::OK();
}
Status MarkCommit(const Slice&) override {
content_flags |= ContentFlags::HAS_COMMIT;
return Status::OK();
}
Status MarkRollback(const Slice&) override {
content_flags |= ContentFlags::HAS_ROLLBACK;
return Status::OK();
}
};
} // anon namespace
struct SavePoints {
std::stack<SavePoint> stack;
};
WriteBatch::WriteBatch(size_t reserved_bytes, size_t max_bytes)
: save_points_(nullptr), content_flags_(0), max_bytes_(max_bytes), rep_() {
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
reserved_bytes : WriteBatchInternal::kHeader);
rep_.resize(WriteBatchInternal::kHeader);
}
WriteBatch::WriteBatch(const std::string& rep)
: save_points_(nullptr),
content_flags_(ContentFlags::DEFERRED),
max_bytes_(0),
rep_(rep) {}
WriteBatch::WriteBatch(const WriteBatch& src)
: save_points_(src.save_points_),
wal_term_point_(src.wal_term_point_),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
rep_(src.rep_) {}
WriteBatch::WriteBatch(WriteBatch&& src)
: save_points_(std::move(src.save_points_)),
wal_term_point_(std::move(src.wal_term_point_)),
content_flags_(src.content_flags_.load(std::memory_order_relaxed)),
max_bytes_(src.max_bytes_),
rep_(std::move(src.rep_)) {}
WriteBatch& WriteBatch::operator=(const WriteBatch& src) {
if (&src != this) {
this->~WriteBatch();
new (this) WriteBatch(src);
}
return *this;
}
WriteBatch& WriteBatch::operator=(WriteBatch&& src) {
if (&src != this) {
this->~WriteBatch();
new (this) WriteBatch(std::move(src));
}
return *this;
}
WriteBatch::~WriteBatch() { delete save_points_; }
WriteBatch::Handler::~Handler() { }
void WriteBatch::Handler::LogData(const Slice& blob) {
// If the user has not specified something to do with blobs, then we ignore
// them.
}
bool WriteBatch::Handler::Continue() {
return true;
}
void WriteBatch::Clear() {
rep_.clear();
rep_.resize(WriteBatchInternal::kHeader);
content_flags_.store(0, std::memory_order_relaxed);
if (save_points_ != nullptr) {
while (!save_points_->stack.empty()) {
save_points_->stack.pop();
}
}
wal_term_point_.clear();
}
int WriteBatch::Count() const {
return WriteBatchInternal::Count(this);
}
uint32_t WriteBatch::ComputeContentFlags() const {
auto rv = content_flags_.load(std::memory_order_relaxed);
if ((rv & ContentFlags::DEFERRED) != 0) {
BatchContentClassifier classifier;
Iterate(&classifier);
rv = classifier.content_flags;
// this method is conceptually const, because it is performing a lazy
// computation that doesn't affect the abstract state of the batch.
// content_flags_ is marked mutable so that we can perform the
// following assignment
content_flags_.store(rv, std::memory_order_relaxed);
}
return rv;
}
void WriteBatch::MarkWalTerminationPoint() {
wal_term_point_.size = GetDataSize();
wal_term_point_.count = Count();
wal_term_point_.content_flags = content_flags_;
}
bool WriteBatch::HasPut() const {
return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
}
bool WriteBatch::HasDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
}
bool WriteBatch::HasSingleDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_SINGLE_DELETE) != 0;
}
bool WriteBatch::HasDeleteRange() const {
return (ComputeContentFlags() & ContentFlags::HAS_DELETE_RANGE) != 0;
}
bool WriteBatch::HasMerge() const {
return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
}
bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
assert(input != nullptr && key != nullptr);
// Skip tag byte
input->remove_prefix(1);
if (cf_record) {
// Skip column_family bytes
uint32_t cf;
if (!GetVarint32(input, &cf)) {
return false;
}
}
// Extract key
return GetLengthPrefixedSlice(input, key);
}
bool WriteBatch::HasBeginPrepare() const {
return (ComputeContentFlags() & ContentFlags::HAS_BEGIN_PREPARE) != 0;
}
bool WriteBatch::HasEndPrepare() const {
return (ComputeContentFlags() & ContentFlags::HAS_END_PREPARE) != 0;
}
bool WriteBatch::HasCommit() const {
return (ComputeContentFlags() & ContentFlags::HAS_COMMIT) != 0;
}
bool WriteBatch::HasRollback() const {
return (ComputeContentFlags() & ContentFlags::HAS_ROLLBACK) != 0;
}
Status ReadRecordFromWriteBatch(Slice* input, char* tag,
uint32_t* column_family, Slice* key,
Slice* value, Slice* blob, Slice* xid) {
assert(key != nullptr && value != nullptr);
*tag = (*input)[0];
input->remove_prefix(1);
*column_family = 0; // default
switch (*tag) {
case kTypeColumnFamilyValue:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Put");
}
// intentional fallthrough
case kTypeValue:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch Put");
}
break;
case kTypeColumnFamilyDeletion:
case kTypeColumnFamilySingleDeletion:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Delete");
}
// intentional fallthrough
case kTypeDeletion:
case kTypeSingleDeletion:
if (!GetLengthPrefixedSlice(input, key)) {
return Status::Corruption("bad WriteBatch Delete");
}
break;
case kTypeColumnFamilyRangeDeletion:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch DeleteRange");
}
// intentional fallthrough
case kTypeRangeDeletion:
// for range delete, "key" is begin_key, "value" is end_key
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch DeleteRange");
}
break;
case kTypeColumnFamilyMerge:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Merge");
}
// intentional fallthrough
case kTypeMerge:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch Merge");
}
break;
case kTypeLogData:
assert(blob != nullptr);
if (!GetLengthPrefixedSlice(input, blob)) {
return Status::Corruption("bad WriteBatch Blob");
}
break;
case kTypeNoop:
case kTypeBeginPrepareXID:
break;
case kTypeEndPrepareXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad EndPrepare XID");
}
break;
case kTypeCommitXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad Commit XID");
}
break;
case kTypeRollbackXID:
if (!GetLengthPrefixedSlice(input, xid)) {
return Status::Corruption("bad Rollback XID");
}
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
return Status::OK();
}
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < WriteBatchInternal::kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(WriteBatchInternal::kHeader);
Slice key, value, blob, xid;
int found = 0;
Status s;
while (s.ok() && !input.empty() && handler->Continue()) {
char tag = 0;
uint32_t column_family = 0; // default
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid);
if (!s.ok()) {
return s;
}
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value);
found++;
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key);
found++;
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_SINGLE_DELETE));
s = handler->SingleDeleteCF(column_family, key);
found++;
break;
case kTypeColumnFamilyRangeDeletion:
case kTypeRangeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE_RANGE));
s = handler->DeleteRangeCF(column_family, key, value);
found++;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_MERGE));
s = handler->MergeCF(column_family, key, value);
found++;
break;
case kTypeLogData:
handler->LogData(blob);
break;
case kTypeBeginPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_BEGIN_PREPARE));
handler->MarkBeginPrepare();
break;
case kTypeEndPrepareXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_END_PREPARE));
handler->MarkEndPrepare(xid);
break;
case kTypeCommitXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_COMMIT));
handler->MarkCommit(xid);
break;
case kTypeRollbackXID:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_ROLLBACK));
handler->MarkRollback(xid);
break;
case kTypeNoop:
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (!s.ok()) {
return s;
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
int WriteBatchInternal::Count(const WriteBatch* b) {
return DecodeFixed32(b->rep_.data() + 8);
}
void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
EncodeFixed32(&b->rep_[8], n);
}
SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
return SequenceNumber(DecodeFixed64(b->rep_.data()));
}
void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}
size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) {
return WriteBatchInternal::kHeader;
}
Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
value);
}
Status WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) {
return WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key,
value);
}
Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
b->rep_.push_back(static_cast<char>(kTypeNoop));
return Status::OK();
}
Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) {
// a manually constructed batch can only contain one prepare section
assert(b->rep_[12] == static_cast<char>(kTypeNoop));
// all savepoints up to this point are cleared
if (b->save_points_ != nullptr) {
while (!b->save_points_->stack.empty()) {
b->save_points_->stack.pop();
}
}
// rewrite noop as begin marker
b->rep_[12] = static_cast<char>(kTypeBeginPrepareXID);
b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_END_PREPARE |
ContentFlags::HAS_BEGIN_PREPARE,
std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::MarkCommit(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeCommitXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_COMMIT,
std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::MarkRollback(WriteBatch* b, const Slice& xid) {
b->rep_.push_back(static_cast<char>(kTypeRollbackXID));
PutLengthPrefixedSlice(&b->rep_, xid);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_ROLLBACK,
std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const Slice& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
key);
}
Status WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
return WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family),
key);
}
Status WriteBatchInternal::SingleDelete(WriteBatch* b,
uint32_t column_family_id,
const Slice& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
return WriteBatchInternal::SingleDelete(
this, GetColumnFamilyID(column_family), key);
}
Status WriteBatchInternal::SingleDelete(WriteBatch* b,
uint32_t column_family_id,
const SliceParts& key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_SINGLE_DELETE,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
return WriteBatchInternal::SingleDelete(
this, GetColumnFamilyID(column_family), key);
}
Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
const Slice& begin_key,
const Slice& end_key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, begin_key);
PutLengthPrefixedSlice(&b->rep_, end_key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE_RANGE,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
begin_key, end_key);
}
Status WriteBatchInternal::DeleteRange(WriteBatch* b, uint32_t column_family_id,
const SliceParts& begin_key,
const SliceParts& end_key) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeRangeDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyRangeDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, begin_key);
PutLengthPrefixedSliceParts(&b->rep_, end_key);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_DELETE_RANGE,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::DeleteRange(ColumnFamilyHandle* column_family,
const SliceParts& begin_key,
const SliceParts& end_key) {
return WriteBatchInternal::DeleteRange(this, GetColumnFamilyID(column_family),
begin_key, end_key);
}
Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeMerge));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
value);
}
Status WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key,
const SliceParts& value) {
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeMerge));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_MERGE,
std::memory_order_relaxed);
return save.commit();
}
Status WriteBatch::Merge(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) {
return WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key,
value);
}
Status WriteBatch::PutLogData(const Slice& blob) {
LocalSavePoint save(this);
rep_.push_back(static_cast<char>(kTypeLogData));
PutLengthPrefixedSlice(&rep_, blob);
return save.commit();
}
void WriteBatch::SetSavePoint() {
if (save_points_ == nullptr) {
save_points_ = new SavePoints();
}
// Record length and count of current batch of writes.
save_points_->stack.push(SavePoint(
GetDataSize(), Count(), content_flags_.load(std::memory_order_relaxed)));
}
Status WriteBatch::RollbackToSavePoint() {
if (save_points_ == nullptr || save_points_->stack.size() == 0) {
return Status::NotFound();
}
// Pop the most recent savepoint off the stack
SavePoint savepoint = save_points_->stack.top();
save_points_->stack.pop();
assert(savepoint.size <= rep_.size());
assert(savepoint.count <= Count());
if (savepoint.size == rep_.size()) {
// No changes to rollback
} else if (savepoint.size == 0) {
// Rollback everything
Clear();
} else {
rep_.resize(savepoint.size);
WriteBatchInternal::SetCount(this, savepoint.count);
content_flags_.store(savepoint.content_flags, std::memory_order_relaxed);
}
return Status::OK();
}
Status WriteBatch::PopSavePoint() {
if (save_points_ == nullptr || save_points_->stack.size() == 0) {
return Status::NotFound();
}
// Pop the most recent savepoint off the stack
save_points_->stack.pop();
return Status::OK();
}
class MemTableInserter : public WriteBatch::Handler {
SequenceNumber sequence_;
ColumnFamilyMemTables* const cf_mems_;
FlushScheduler* const flush_scheduler_;
const bool ignore_missing_column_families_;
const uint64_t recovering_log_number_;
// log number that all Memtables inserted into should reference
uint64_t log_number_ref_;
DBImpl* db_;
const bool concurrent_memtable_writes_;
bool post_info_created_;
bool* has_valid_writes_;
// On some (!) platforms just default creating
// a map is too expensive in the Write() path as they
// cause memory allocations though unused.
// Make creation optional but do not incur
// unique_ptr additional allocation
using
MemPostInfoMap = std::map<MemTable*, MemTablePostProcessInfo>;
using
PostMapType = std::aligned_storage<sizeof(MemPostInfoMap)>::type;
PostMapType mem_post_info_map_;
// current recovered transaction we are rebuilding (recovery)
WriteBatch* rebuilding_trx_;
MemPostInfoMap& GetPostMap() {
assert(concurrent_memtable_writes_);
if(!post_info_created_) {
new (&mem_post_info_map_) MemPostInfoMap();
post_info_created_ = true;
}
return *reinterpret_cast<MemPostInfoMap*>(&mem_post_info_map_);
}
public:
// cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t recovering_log_number, DB* db,
bool concurrent_memtable_writes,
bool* has_valid_writes = nullptr)
: sequence_(sequence),
cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler),
ignore_missing_column_families_(ignore_missing_column_families),
recovering_log_number_(recovering_log_number),
log_number_ref_(0),
db_(reinterpret_cast<DBImpl*>(db)),
concurrent_memtable_writes_(concurrent_memtable_writes),
post_info_created_(false),
has_valid_writes_(has_valid_writes),
rebuilding_trx_(nullptr) {
assert(cf_mems_);
}
~MemTableInserter() {
if (post_info_created_) {
reinterpret_cast<MemPostInfoMap*>
(&mem_post_info_map_)->~MemPostInfoMap();
}
}
MemTableInserter(const MemTableInserter&) = delete;
MemTableInserter& operator=(const MemTableInserter&) = delete;
void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
SequenceNumber get_final_sequence() const { return sequence_; }
void PostProcess() {
assert(concurrent_memtable_writes_);
// If post info was not created there is nothing
// to process and no need to create on demand
if(post_info_created_) {
for (auto& pair : GetPostMap()) {
pair.first->BatchPostProcess(pair.second);
}
}
}
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
// If we are in a concurrent mode, it is the caller's responsibility
// to clone the original ColumnFamilyMemTables so that each thread
// has its own instance. Otherwise, it must be guaranteed that there
// is no concurrent access
bool found = cf_mems_->Seek(column_family_id);
if (!found) {
if (ignore_missing_column_families_) {
*s = Status::OK();
} else {
*s = Status::InvalidArgument(
"Invalid column family specified in write batch");
}
return false;
}
if (recovering_log_number_ != 0 &&
recovering_log_number_ < cf_mems_->GetLogNumber()) {
// This is true only in recovery environment (recovering_log_number_ is
// always 0 in
// non-recovery, regular write code-path)
// * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
// column
// family already contains updates from this log. We can't apply updates
// twice because of update-in-place or merge workloads -- ignore the
// update
*s = Status::OK();
return false;
}
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
if (log_number_ref_ > 0) {
cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
}
return true;
}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!moptions->inplace_update_support) {
mem->Add(sequence_, kTypeValue, key, value, concurrent_memtable_writes_,
get_post_process_info(mem));
} else if (moptions->inplace_callback == nullptr) {
assert(!concurrent_memtable_writes_);
mem->Update(sequence_, key, value);
RecordTick(moptions->statistics, NUMBER_KEYS_UPDATED);
} else {
assert(!concurrent_memtable_writes_);
if (mem->UpdateCallback(sequence_, key, value)) {
} else {
// key not found in memtable. Do sst get, update, add
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string prev_value;
std::string merged_value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
Status s = Status::NotSupported();
if (db_ != nullptr && recovering_log_number_ == 0) {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
s = db_->Get(ropts, cf_handle, key, &prev_value);
}
char* prev_buffer = const_cast<char*>(prev_value.c_str());
uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
auto status = moptions->inplace_callback(s.ok() ? prev_buffer : nullptr,
s.ok() ? &prev_size : nullptr,
value, &merged_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// prev_value is updated in-place with final value.
mem->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
} else if (status == UpdateStatus::UPDATED) {
// merged_value contains the final value.
mem->Add(sequence_, kTypeValue, key, Slice(merged_value));
RecordTick(moptions->statistics, NUMBER_KEYS_WRITTEN);
}
}
}
// Since all Puts are logged in trasaction logs (if enabled), always bump
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
sequence_++;
CheckMemtableFull();
return Status::OK();
}
Status DeleteImpl(uint32_t column_family_id, const Slice& key,
const Slice& value, ValueType delete_type) {
MemTable* mem = cf_mems_->GetMemTable();
mem->Add(sequence_, delete_type, key, value, concurrent_memtable_writes_,
get_post_process_info(mem));
sequence_++;
CheckMemtableFull();
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Delete(rebuilding_trx_, column_family_id, key);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
return DeleteImpl(column_family_id, key, Slice(), kTypeDeletion);
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::SingleDelete(rebuilding_trx_, column_family_id, key);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
return DeleteImpl(column_family_id, key, Slice(), kTypeSingleDeletion);
}
virtual Status DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key,
const Slice& end_key) override {
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::DeleteRange(rebuilding_trx_, column_family_id,
begin_key, end_key);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
if (db_ != nullptr) {
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cf_handle)->cfd();
if (!cfd->is_delete_range_supported()) {
return Status::NotSupported(
std::string("DeleteRange not supported for table type ") +
cfd->ioptions()->table_factory->Name() + " in CF " +
cfd->GetName());
}
}
return DeleteImpl(column_family_id, begin_key, end_key, kTypeRangeDeletion);
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
assert(!concurrent_memtable_writes_);
if (rebuilding_trx_ != nullptr) {
WriteBatchInternal::Merge(rebuilding_trx_, column_family_id, key, value);
return Status::OK();
}
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
bool perform_merge = false;
// If we pass DB through and options.max_successive_merges is hit
// during recovery, Get() will be issued which will try to acquire
// DB mutex and cause deadlock, as DB mutex is already held.
// So we disable merge in recovery
if (moptions->max_successive_merges > 0 && db_ != nullptr &&
recovering_log_number_ == 0) {
LookupKey lkey(key, sequence_);
// Count the number of successive merges at the head
// of the key in the memtable
size_t num_merges = mem->CountSuccessiveMergeEntries(lkey);
if (num_merges >= moptions->max_successive_merges) {
perform_merge = true;
}
}
if (perform_merge) {
// 1) Get the existing value
std::string get_value;
// Pass in the sequence number so that we also include previous merge
// operations in the same batch.
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions read_options;
read_options.snapshot = &read_from_snapshot;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
db_->Get(read_options, cf_handle, key, &get_value);
Slice get_value_slice = Slice(get_value);
// 2) Apply this merge
auto merge_operator = moptions->merge_operator;
assert(merge_operator);
std::string new_value;
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, {value}, &new_value,
moptions->info_log, moptions->statistics, Env::Default());
if (!merge_status.ok()) {
// Failed to merge!
// Store the delta in memtable
perform_merge = false;
} else {
// 3) Add value to memtable
mem->Add(sequence_, kTypeValue, key, new_value);
}
}
if (!perform_merge) {
// Add merge operator to memtable
mem->Add(sequence_, kTypeMerge, key, value);
}
sequence_++;
CheckMemtableFull();
return Status::OK();
}
void CheckMemtableFull() {
if (flush_scheduler_ != nullptr) {
auto* cfd = cf_mems_->current();
assert(cfd != nullptr);
if (cfd->mem()->ShouldScheduleFlush() &&
cfd->mem()->MarkFlushScheduled()) {
// MarkFlushScheduled only returns true if we are the one that
// should take action, so no need to dedup further
flush_scheduler_->ScheduleFlush(cfd);
}
}
}
Status MarkBeginPrepare() override {
assert(rebuilding_trx_ == nullptr);
assert(db_);
if (recovering_log_number_ != 0) {
// during recovery we rebuild a hollow transaction
// from all encountered prepare sections of the wal
if (db_->allow_2pc() == false) {
return Status::NotSupported(
"WAL contains prepared transactions. Open with "
"TransactionDB::Open().");
}
// we are now iterating through a prepared section
rebuilding_trx_ = new WriteBatch();
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
} else {
// in non-recovery we ignore prepare markers
// and insert the values directly. making sure we have a
// log for each insertion to reference.
assert(log_number_ref_ > 0);
}
return Status::OK();
}
Status MarkEndPrepare(const Slice& name) override {
assert(db_);
assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
if (recovering_log_number_ != 0) {
assert(db_->allow_2pc());
db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
rebuilding_trx_);
rebuilding_trx_ = nullptr;
} else {
assert(rebuilding_trx_ == nullptr);
assert(log_number_ref_ > 0);
}
return Status::OK();
}
Status MarkCommit(const Slice& name) override {
assert(db_);
Status s;
if (recovering_log_number_ != 0) {
// in recovery when we encounter a commit marker
// we lookup this transaction in our set of rebuilt transactions
// and commit.
auto trx = db_->GetRecoveredTransaction(name.ToString());
// the log contaiting the prepared section may have
// been released in the last incarnation because the
// data was flushed to L0
if (trx != nullptr) {
// at this point individual CF lognumbers will prevent
// duplicate re-insertion of values.
assert(log_number_ref_ == 0);
// all insertes must reference this trx log number
log_number_ref_ = trx->log_number_;
s = trx->batch_->Iterate(this);
log_number_ref_ = 0;
if (s.ok()) {
db_->DeleteRecoveredTransaction(name.ToString());
}
if (has_valid_writes_ != nullptr) {
*has_valid_writes_ = true;
}
}
} else {
// in non recovery we simply ignore this tag
}
return s;
}
Status MarkRollback(const Slice& name) override {
assert(db_);
if (recovering_log_number_ != 0) {
auto trx = db_->GetRecoveredTransaction(name.ToString());
// the log containing the transactions prep section
// may have been released in the previous incarnation
// because we knew it had been rolled back
if (trx != nullptr) {
db_->DeleteRecoveredTransaction(name.ToString());
}
} else {
// in non recovery we simply ignore this tag
}
return Status::OK();
}
private:
MemTablePostProcessInfo* get_post_process_info(MemTable* mem) {
if (!concurrent_memtable_writes_) {
// No need to batch counters locally if we don't use concurrent mode.
return nullptr;
}
return &GetPostMap()[mem];
}
};
// This function can only be called in these conditions:
// 1) During Recovery()
// 2) During Write(), in a single-threaded write thread
// 3) During Write(), in a concurrent context where memtables has been cloned
// The reason is that it calls memtables->Seek(), which has a stateful cache
Status WriteBatchInternal::InsertInto(WriteThread::WriteGroup& write_group,
SequenceNumber sequence,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes) {
MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, recovery_log_number,
db, concurrent_memtable_writes);
for (auto w : write_group) {
if (!w->ShouldWriteToMemtable()) {
continue;
}
inserter.set_log_number_ref(w->log_ref);
w->status = w->batch->Iterate(&inserter);
if (!w->status.ok()) {
return w->status;
}
}
return Status::OK();
}
Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
bool concurrent_memtable_writes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(writer->batch),
memtables, flush_scheduler,
ignore_missing_column_families, log_number, db,
concurrent_memtable_writes);
assert(writer->ShouldWriteToMemtable());
inserter.set_log_number_ref(writer->log_ref);
Status s = writer->batch->Iterate(&inserter);
if (concurrent_memtable_writes) {
inserter.PostProcess();
}
return s;
}
Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, bool ignore_missing_column_families,
uint64_t log_number, DB* db, bool concurrent_memtable_writes,
SequenceNumber* last_seq_used, bool* has_valid_writes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables,
flush_scheduler, ignore_missing_column_families,
log_number, db, concurrent_memtable_writes,
has_valid_writes);
Status s = batch->Iterate(&inserter);
if (last_seq_used != nullptr) {
*last_seq_used = inserter.get_final_sequence();
}
if (concurrent_memtable_writes) {
inserter.PostProcess();
}
return s;
}
Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= WriteBatchInternal::kHeader);
b->rep_.assign(contents.data(), contents.size());
b->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
return Status::OK();
}
Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
const bool wal_only) {
size_t src_len;
int src_count;
uint32_t src_flags;
const SavePoint& batch_end = src->GetWalTerminationPoint();
if (wal_only && !batch_end.is_cleared()) {
src_len = batch_end.size - WriteBatchInternal::kHeader;
src_count = batch_end.count;
src_flags = batch_end.content_flags;
} else {
src_len = src->rep_.size() - WriteBatchInternal::kHeader;
src_count = Count(src);
src_flags = src->content_flags_.load(std::memory_order_relaxed);
}
SetCount(dst, Count(dst) + src_count);
assert(src->rep_.size() >= WriteBatchInternal::kHeader);
dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader, src_len);
dst->content_flags_.store(
dst->content_flags_.load(std::memory_order_relaxed) | src_flags,
std::memory_order_relaxed);
return Status::OK();
}
size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
size_t rightByteSize) {
if (leftByteSize == 0 || rightByteSize == 0) {
return leftByteSize + rightByteSize;
} else {
return leftByteSize + rightByteSize - WriteBatchInternal::kHeader;
}
}
} // namespace rocksdb