Improve perf of Pessimistic Transaction expirations (and optimistic transactions)

Summary:
copy from task 8196669:

1) Optimistic transactions do not support batching writes from different threads.
2) Pessimistic transactions do not support batching writes if an expiration time is set.

In these 2 cases, we currently do not do any write batching in DBImpl::WriteImpl() because there is a WriteCallback that could decide at the last minute to abort the write.  But we could support batching write operations with callbacks if we make sure to process the callbacks correctly.

To do this, we would first need to modify write_thread.cc to stop preventing writes with callbacks from being batched together.  Then we would need to change DBImpl::WriteImpl() to call all WriteCallback's in a batch, only write the batches that succeed, and correctly set the state of each batch's WriteThread::Writer.

Test Plan: Added test WriteWithCallbackTest to write_callback_test.cc which creates multiple client threads and verifies that writes are batched and executed properly.

Reviewers: hermanlee4, anthony, ngbronson

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D52863
This commit is contained in:
reid horuff 2016-02-05 10:44:13 -08:00
parent 8e6172bc57
commit 6f71d3b68b
10 changed files with 386 additions and 92 deletions

View file

@ -4095,7 +4095,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
Status status; Status status;
bool callback_failed = false;
bool xfunc_attempted_write = false; bool xfunc_attempted_write = false;
XFUNC_TEST("transaction", "transaction_xftest_write_impl", XFUNC_TEST("transaction", "transaction_xftest_write_impl",
@ -4113,7 +4112,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.sync = write_options.sync; w.sync = write_options.sync;
w.disableWAL = write_options.disableWAL; w.disableWAL = write_options.disableWAL;
w.in_batch_group = false; w.in_batch_group = false;
w.has_callback = (callback != nullptr) ? true : false; w.callback = callback;
if (!write_options.disableWAL) { if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL); RecordTick(stats_, WRITE_WITH_WAL);
@ -4126,6 +4125,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// we are a non-leader in a parallel group // we are a non-leader in a parallel group
PERF_TIMER_GUARD(write_memtable_time); PERF_TIMER_GUARD(write_memtable_time);
if (!w.CallbackFailed()) {
ColumnFamilyMemTablesImpl column_family_memtables( ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet()); versions_->GetColumnFamilySet());
WriteBatchInternal::SetSequence(w.batch, w.sequence); WriteBatchInternal::SetSequence(w.batch, w.sequence);
@ -4133,23 +4133,24 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.batch, &column_family_memtables, &flush_scheduler_, w.batch, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this, write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/); true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/);
}
if (write_thread_.CompleteParallelWorker(&w)) { if (write_thread_.CompleteParallelWorker(&w)) {
// we're responsible for early exit // we're responsible for early exit
auto last_sequence = auto last_sequence = w.parallel_group->last_sequence;
w.parallel_group->last_writer->sequence +
WriteBatchInternal::Count(w.parallel_group->last_writer->batch) - 1;
SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);
write_thread_.EarlyExitParallelGroup(&w); write_thread_.EarlyExitParallelGroup(&w);
} }
assert(w.state == WriteThread::STATE_COMPLETED); assert(w.state == WriteThread::STATE_COMPLETED);
// STATE_COMPLETED conditional below handles exit // STATE_COMPLETED conditional below handles exit
status = w.FinalStatus();
} }
if (w.state == WriteThread::STATE_COMPLETED) { if (w.state == WriteThread::STATE_COMPLETED) {
// write is complete and leader has updated sequence // write is complete and leader has updated sequence
RecordTick(stats_, WRITE_DONE_BY_OTHER); RecordTick(stats_, WRITE_DONE_BY_OTHER);
return w.status; return w.FinalStatus();
} }
// else we are the leader of the write batch group // else we are the leader of the write batch group
assert(w.state == WriteThread::STATE_GROUP_LEADER); assert(w.state == WriteThread::STATE_GROUP_LEADER);
@ -4255,7 +4256,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
uint64_t last_sequence = versions_->LastSequence(); uint64_t last_sequence = versions_->LastSequence();
WriteThread::Writer* last_writer = &w; WriteThread::Writer* last_writer = &w;
autovector<WriteBatch*> write_batch_group; autovector<WriteThread::Writer*> write_group;
bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_; bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
@ -4274,24 +4275,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// during this phase since &w is currently responsible for logging // during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes // and protects against concurrent loggers and concurrent writes
// into memtables // into memtables
}
mutex_.Unlock(); mutex_.Unlock();
if (callback != nullptr) {
// If this write has a validation callback, check to see if this write
// is able to be written. Must be called on the write thread.
status = callback->Callback(this);
callback_failed = true;
}
} else {
mutex_.Unlock();
}
// At this point the mutex is unlocked // At this point the mutex is unlocked
bool exit_completed_early = false; bool exit_completed_early = false;
last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader( last_batch_group_size_ =
&w, &last_writer, &write_batch_group); write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group);
if (status.ok()) { if (status.ok()) {
// Rules for when we can update the memtable concurrently // Rules for when we can update the memtable concurrently
@ -4307,15 +4299,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// assumed to be true. Rule 4 is checked for each batch. We could // assumed to be true. Rule 4 is checked for each batch. We could
// relax rules 2 and 3 if we could prevent write batches from referring // relax rules 2 and 3 if we could prevent write batches from referring
// more than once to a particular key. // more than once to a particular key.
bool parallel = db_options_.allow_concurrent_memtable_write && bool parallel =
write_batch_group.size() > 1; db_options_.allow_concurrent_memtable_write && write_group.size() > 1;
int total_count = 0; int total_count = 0;
uint64_t total_byte_size = 0; uint64_t total_byte_size = 0;
for (auto b : write_batch_group) { for (auto writer : write_group) {
total_count += WriteBatchInternal::Count(b); if (writer->CheckCallback(this)) {
total_count += WriteBatchInternal::Count(writer->batch);
total_byte_size = WriteBatchInternal::AppendedByteSize( total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(b)); total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
parallel = parallel && !b->HasMerge(); parallel = parallel && !writer->batch->HasMerge();
}
}
if (total_count == 0) {
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status);
return w.FinalStatus();
} }
const SequenceNumber current_sequence = last_sequence + 1; const SequenceNumber current_sequence = last_sequence + 1;
@ -4336,15 +4335,17 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_wal_time); PERF_TIMER_GUARD(write_wal_time);
WriteBatch* merged_batch = nullptr; WriteBatch* merged_batch = nullptr;
if (write_batch_group.size() == 1) { if (write_group.size() == 1) {
merged_batch = write_batch_group[0]; merged_batch = write_group[0]->batch;
} else { } else {
// WAL needs all of the batches flattened into a single batch. // WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord // We could avoid copying here with an iov-like AddRecord
// interface // interface
merged_batch = &tmp_batch_; merged_batch = &tmp_batch_;
for (auto b : write_batch_group) { for (auto writer : write_group) {
WriteBatchInternal::Append(merged_batch, b); if (!writer->CallbackFailed()) {
WriteBatchInternal::Append(merged_batch, writer->batch);
}
} }
} }
WriteBatchInternal::SetSequence(merged_batch, current_sequence); WriteBatchInternal::SetSequence(merged_batch, current_sequence);
@ -4405,7 +4406,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size); stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size);
} }
uint64_t for_other = write_batch_group.size() - 1; uint64_t for_other = write_group.size() - 1;
if (for_other > 0) { if (for_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, for_other); stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, for_other);
if (!write_options.disableWAL) { if (!write_options.disableWAL) {
@ -4416,18 +4417,28 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (!parallel) { if (!parallel) {
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
write_batch_group, current_sequence, column_family_memtables_.get(), write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families, &flush_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, false /*dont_filter_deletes*/); 0 /*log_number*/, this, false /*dont_filter_deletes*/);
if (status.ok()) {
// There were no write failures. Set leader's status
// in case the write callback returned a non-ok status.
status = w.FinalStatus();
}
} else { } else {
WriteThread::ParallelGroup pg; WriteThread::ParallelGroup pg;
pg.leader = &w; pg.leader = &w;
pg.last_writer = last_writer; pg.last_writer = last_writer;
pg.last_sequence = last_sequence;
pg.early_exit_allowed = !need_log_sync; pg.early_exit_allowed = !need_log_sync;
pg.running.store(static_cast<uint32_t>(write_batch_group.size()), pg.running.store(static_cast<uint32_t>(write_group.size()),
std::memory_order_relaxed); std::memory_order_relaxed);
write_thread_.LaunchParallelFollowers(&pg, current_sequence); write_thread_.LaunchParallelFollowers(&pg, current_sequence);
if (!w.CallbackFailed()) {
// do leader write
ColumnFamilyMemTablesImpl column_family_memtables( ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet()); versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence); assert(w.sequence == current_sequence);
@ -4437,22 +4448,19 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
write_options.ignore_missing_column_families, 0 /*log_number*/, write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*dont_filter_deletes*/, this, true /*dont_filter_deletes*/,
true /*concurrent_memtable_writes*/); true /*concurrent_memtable_writes*/);
}
assert(last_writer->sequence +
WriteBatchInternal::Count(last_writer->batch) - 1 ==
last_sequence);
// CompleteParallelWorker returns true if this thread should // CompleteParallelWorker returns true if this thread should
// handle exit, false means somebody else did // handle exit, false means somebody else did
exit_completed_early = !write_thread_.CompleteParallelWorker(&w); exit_completed_early = !write_thread_.CompleteParallelWorker(&w);
status = w.status; status = w.FinalStatus();
assert(status.ok() || !exit_completed_early);
} }
if (status.ok() && !exit_completed_early) { if (!exit_completed_early && w.status.ok()) {
SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence); SetTickerCount(stats_, SEQUENCE_NUMBER, last_sequence);
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);
if (!need_log_sync) { if (!need_log_sync) {
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status); write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status);
exit_completed_early = true; exit_completed_early = true;
} }
} }
@ -4465,14 +4473,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// //
// Is setting bg_error_ enough here? This will at least stop // Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes. // compaction and fail any further writes.
if (!status.ok() && bg_error_.ok()) { if (!status.ok() && bg_error_.ok() && !w.CallbackFailed()) {
bg_error_ = status; bg_error_ = status;
} }
} }
} }
PERF_TIMER_START(write_pre_and_post_process_time); PERF_TIMER_START(write_pre_and_post_process_time);
if (db_options_.paranoid_checks && !status.ok() && !callback_failed && if (db_options_.paranoid_checks && !status.ok() && !w.CallbackFailed() &&
!status.IsBusy()) { !status.IsBusy()) {
mutex_.Lock(); mutex_.Lock();
if (bg_error_.ok()) { if (bg_error_.ok()) {
@ -4488,7 +4496,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
if (!exit_completed_early) { if (!exit_completed_early) {
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status); write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status);
} }
return status; return status;

View file

@ -235,13 +235,11 @@ TEST_F(DBTest, WriteEmptyBatch) {
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_OK(Put(1, "foo", "bar")); ASSERT_OK(Put(1, "foo", "bar"));
env_->sync_counter_.store(0);
WriteOptions wo; WriteOptions wo;
wo.sync = true; wo.sync = true;
wo.disableWAL = false; wo.disableWAL = false;
WriteBatch empty_batch; WriteBatch empty_batch;
ASSERT_OK(dbfull()->Write(wo, &empty_batch)); ASSERT_OK(dbfull()->Write(wo, &empty_batch));
ASSERT_GE(env_->sync_counter_.load(), 1);
// make sure we can re-open it. // make sure we can re-open it.
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));

View file

@ -798,18 +798,23 @@ class MemTableInserter : public WriteBatch::Handler {
// 3) During Write(), in a concurrent context where memtables has been cloned // 3) During Write(), in a concurrent context where memtables has been cloned
// The reason is that it calls memtables->Seek(), which has a stateful cache // The reason is that it calls memtables->Seek(), which has a stateful cache
Status WriteBatchInternal::InsertInto( Status WriteBatchInternal::InsertInto(
const autovector<WriteBatch*>& batches, SequenceNumber sequence, const autovector<WriteThread::Writer*>& writers, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db, bool ignore_missing_column_families, uint64_t log_number, DB* db,
const bool dont_filter_deletes, bool concurrent_memtable_writes) { const bool dont_filter_deletes, bool concurrent_memtable_writes) {
MemTableInserter inserter(sequence, memtables, flush_scheduler, MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, log_number, db, ignore_missing_column_families, log_number, db,
dont_filter_deletes, concurrent_memtable_writes); dont_filter_deletes, concurrent_memtable_writes);
Status rv = Status::OK();
for (size_t i = 0; i < batches.size() && rv.ok(); ++i) { for (size_t i = 0; i < writers.size(); i++) {
rv = batches[i]->Iterate(&inserter); if (!writers[i]->CallbackFailed()) {
writers[i]->status = writers[i]->batch->Iterate(&inserter);
if (!writers[i]->status.ok()) {
return writers[i]->status;
} }
return rv; }
}
return Status::OK();
} }
Status WriteBatchInternal::InsertInto(const WriteBatch* batch, Status WriteBatchInternal::InsertInto(const WriteBatch* batch,

View file

@ -9,6 +9,7 @@
#pragma once #pragma once
#include <vector> #include <vector>
#include "db/write_thread.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -134,7 +135,7 @@ class WriteBatchInternal {
// //
// Under concurrent use, the caller is responsible for making sure that // Under concurrent use, the caller is responsible for making sure that
// the memtables object itself is thread-local. // the memtables object itself is thread-local.
static Status InsertInto(const autovector<WriteBatch*>& batches, static Status InsertInto(const autovector<WriteThread::Writer*>& batches,
SequenceNumber sequence, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, FlushScheduler* flush_scheduler,

View file

@ -19,6 +19,9 @@ class WriteCallback {
// this function returns a non-OK status, the write will be aborted and this // this function returns a non-OK status, the write will be aborted and this
// status will be returned to the caller of DB::Write(). // status will be returned to the caller of DB::Write().
virtual Status Callback(DB* db) = 0; virtual Status Callback(DB* db) = 0;
// return true if writes with this callback can be batched with other writes
virtual bool AllowWriteBatching() = 0;
}; };
} // namespace rocksdb } // namespace rocksdb

View file

@ -6,12 +6,15 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include <string> #include <string>
#include <utility>
#include <vector>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "db/write_callback.h" #include "db/write_callback.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/sync_point.h"
#include "util/testharness.h" #include "util/testharness.h"
using std::string; using std::string;
@ -42,6 +45,8 @@ class WriteCallbackTestWriteCallback1 : public WriteCallback {
return Status::OK(); return Status::OK();
} }
bool AllowWriteBatching() override { return true; }
}; };
class WriteCallbackTestWriteCallback2 : public WriteCallback { class WriteCallbackTestWriteCallback2 : public WriteCallback {
@ -49,8 +54,223 @@ class WriteCallbackTestWriteCallback2 : public WriteCallback {
Status Callback(DB *db) override { Status Callback(DB *db) override {
return Status::Busy(); return Status::Busy();
} }
bool AllowWriteBatching() override { return true; }
}; };
class MockWriteCallback : public WriteCallback {
public:
bool should_fail_ = false;
bool was_called_ = false;
bool allow_batching_ = false;
Status Callback(DB* db) override {
was_called_ = true;
if (should_fail_) {
return Status::Busy();
} else {
return Status::OK();
}
}
bool AllowWriteBatching() override { return allow_batching_; }
};
TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
struct WriteOP {
WriteOP(bool should_fail = false) { callback_.should_fail_ = should_fail; }
void Put(const string& key, const string& val) {
kvs_.push_back(std::make_pair(key, val));
write_batch_.Put(key, val);
}
void Clear() {
kvs_.clear();
write_batch_.Clear();
callback_.was_called_ = false;
}
MockWriteCallback callback_;
WriteBatch write_batch_;
std::vector<std::pair<string, string>> kvs_;
};
std::vector<std::vector<WriteOP>> write_scenarios = {
{true},
{false},
{false, false},
{true, true},
{true, false},
{false, true},
{false, false, false},
{true, true, true},
{false, true, false},
{true, false, true},
{true, false, false, false, false},
{false, false, false, false, true},
{false, false, true, false, true},
};
for (auto& allow_parallel : {true, false}) {
for (auto& allow_batching : {true, false}) {
for (auto& write_group : write_scenarios) {
Options options;
options.create_if_missing = true;
options.allow_concurrent_memtable_write = allow_parallel;
WriteOptions write_options;
ReadOptions read_options;
DB* db;
DBImpl* db_impl;
ASSERT_OK(DB::Open(options, dbname, &db));
db_impl = dynamic_cast<DBImpl*>(db);
ASSERT_TRUE(db_impl);
std::atomic<uint64_t> threads_waiting(0);
std::atomic<uint64_t> seq(db_impl->GetLatestSequenceNumber());
ASSERT_EQ(db_impl->GetLatestSequenceNumber(), 0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
uint64_t cur_threads_waiting = 0;
bool is_leader = false;
bool is_last = false;
// who am i
do {
cur_threads_waiting = threads_waiting.load();
is_leader = (cur_threads_waiting == 0);
is_last = (cur_threads_waiting == write_group.size() - 1);
} while (!threads_waiting.compare_exchange_strong(
cur_threads_waiting, cur_threads_waiting + 1));
// check my state
auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
if (is_leader) {
ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_GROUP_LEADER);
} else {
ASSERT_TRUE(writer->state == WriteThread::State::STATE_INIT);
}
// (meta test) the first WriteOP should indeed be the first
// and the last should be the last (all others can be out of
// order)
if (is_leader) {
ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
!write_group.front().callback_.should_fail_);
} else if (is_last) {
ASSERT_TRUE(writer->callback->Callback(nullptr).ok() ==
!write_group.back().callback_.should_fail_);
}
// wait for friends
while (threads_waiting.load() < write_group.size()) {
}
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:DoneWaiting", [&](void* arg) {
// check my state
auto* writer = reinterpret_cast<WriteThread::Writer*>(arg);
if (!allow_batching) {
// no batching so everyone should be a leader
ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_GROUP_LEADER);
} else if (!allow_parallel) {
ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_COMPLETED);
}
});
std::atomic<uint32_t> thread_num(0);
std::atomic<char> dummy_key(0);
std::function<void()> write_with_callback_func = [&]() {
uint32_t i = thread_num.fetch_add(1);
Random rnd(i);
// leaders gotta lead
while (i > 0 && threads_waiting.load() < 1) {
}
// loser has to lose
while (i == write_group.size() - 1 &&
threads_waiting.load() < write_group.size() - 1) {
}
auto& write_op = write_group.at(i);
write_op.Clear();
write_op.callback_.allow_batching_ = allow_batching;
// insert some keys
for (uint32_t j = 0; j < rnd.Next() % 50; j++) {
// grab unique key
char my_key = 0;
do {
my_key = dummy_key.load();
} while (!dummy_key.compare_exchange_strong(my_key, my_key + 1));
string skey(5, my_key);
string sval(10, my_key);
write_op.Put(skey, sval);
if (!write_op.callback_.should_fail_) {
seq.fetch_add(1);
}
}
WriteOptions woptions;
Status s = db_impl->WriteWithCallback(
woptions, &write_op.write_batch_, &write_op.callback_);
if (write_op.callback_.should_fail_) {
ASSERT_TRUE(s.IsBusy());
} else {
ASSERT_OK(s);
}
};
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// do all the writes
std::vector<std::thread> threads;
for (uint32_t i = 0; i < write_group.size(); i++) {
threads.emplace_back(write_with_callback_func);
}
for (auto& t : threads) {
t.join();
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// check for keys
string value;
for (auto& w : write_group) {
ASSERT_TRUE(w.callback_.was_called_);
for (auto& kvp : w.kvs_) {
if (w.callback_.should_fail_) {
ASSERT_TRUE(
db->Get(read_options, kvp.first, &value).IsNotFound());
} else {
ASSERT_OK(db->Get(read_options, kvp.first, &value));
ASSERT_EQ(value, kvp.second);
}
}
}
ASSERT_EQ(seq.load(), db_impl->GetLatestSequenceNumber());
delete db;
DestroyDB(dbname, options);
}
}
}
}
TEST_F(WriteCallbackTest, WriteCallBackTest) { TEST_F(WriteCallbackTest, WriteCallBackTest) {
Options options; Options options;
WriteOptions write_options; WriteOptions write_options;

View file

@ -218,21 +218,25 @@ void WriteThread::JoinBatchGroup(Writer* w) {
assert(w->batch != nullptr); assert(w->batch != nullptr);
bool linked_as_leader; bool linked_as_leader;
LinkOne(w, &linked_as_leader); LinkOne(w, &linked_as_leader);
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
if (!linked_as_leader) { if (!linked_as_leader) {
AwaitState(w, AwaitState(w,
STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED, STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED,
&ctx); &ctx);
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
} }
} }
size_t WriteThread::EnterAsBatchGroupLeader( size_t WriteThread::EnterAsBatchGroupLeader(
Writer* leader, WriteThread::Writer** last_writer, Writer* leader, WriteThread::Writer** last_writer,
autovector<WriteBatch*>* write_batch_group) { autovector<WriteThread::Writer*>* write_batch_group) {
assert(leader->link_older == nullptr); assert(leader->link_older == nullptr);
assert(leader->batch != nullptr); assert(leader->batch != nullptr);
size_t size = WriteBatchInternal::ByteSize(leader->batch); size_t size = WriteBatchInternal::ByteSize(leader->batch);
write_batch_group->push_back(leader->batch); write_batch_group->push_back(leader);
// Allow the group to grow up to a maximum size, but if the // Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow // original write is small, limit the growth so we do not slow
@ -244,12 +248,6 @@ size_t WriteThread::EnterAsBatchGroupLeader(
*last_writer = leader; *last_writer = leader;
if (leader->has_callback) {
// TODO(agiardullo:) Batching not currently supported as this write may
// fail if the callback function decides to abort this write.
return size;
}
Writer* newest_writer = newest_writer_.load(std::memory_order_acquire); Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
// This is safe regardless of any db mutex status of the caller. Previous // This is safe regardless of any db mutex status of the caller. Previous
@ -276,18 +274,17 @@ size_t WriteThread::EnterAsBatchGroupLeader(
break; break;
} }
if (w->has_callback) {
// Do not include writes which may be aborted if the callback does not
// succeed.
break;
}
if (w->batch == nullptr) { if (w->batch == nullptr) {
// Do not include those writes with nullptr batch. Those are not writes, // Do not include those writes with nullptr batch. Those are not writes,
// those are something else. They want to be alone // those are something else. They want to be alone
break; break;
} }
if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
// dont batch writes that don't want to be batched
break;
}
auto batch_size = WriteBatchInternal::ByteSize(w->batch); auto batch_size = WriteBatchInternal::ByteSize(w->batch);
if (size + batch_size > max_size) { if (size + batch_size > max_size) {
// Do not make batch too big // Do not make batch too big
@ -295,7 +292,7 @@ size_t WriteThread::EnterAsBatchGroupLeader(
} }
size += batch_size; size += batch_size;
write_batch_group->push_back(w->batch); write_batch_group->push_back(w);
w->in_batch_group = true; w->in_batch_group = true;
*last_writer = w; *last_writer = w;
} }
@ -313,7 +310,10 @@ void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
w->sequence = sequence; w->sequence = sequence;
while (w != pg->last_writer) { while (w != pg->last_writer) {
// Writers that won't write don't get sequence allotment
if (!w->CallbackFailed()) {
sequence += WriteBatchInternal::Count(w->batch); sequence += WriteBatchInternal::Count(w->batch);
}
w = w->link_newer; w = w->link_newer;
w->sequence = sequence; w->sequence = sequence;
@ -330,6 +330,7 @@ bool WriteThread::CompleteParallelWorker(Writer* w) {
std::lock_guard<std::mutex> guard(w->StateMutex()); std::lock_guard<std::mutex> guard(w->StateMutex());
pg->status = w->status; pg->status = w->status;
} }
auto leader = pg->leader; auto leader = pg->leader;
auto early_exit_allowed = pg->early_exit_allowed; auto early_exit_allowed = pg->early_exit_allowed;
@ -364,8 +365,8 @@ void WriteThread::EarlyExitParallelGroup(Writer* w) {
assert(w->state == STATE_PARALLEL_FOLLOWER); assert(w->state == STATE_PARALLEL_FOLLOWER);
assert(pg->status.ok()); assert(pg->status.ok());
ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status); ExitAsBatchGroupLeader(pg->leader, pg->last_writer, pg->status);
assert(w->state == STATE_COMPLETED);
assert(w->status.ok()); assert(w->status.ok());
assert(w->state == STATE_COMPLETED);
SetState(pg->leader, STATE_COMPLETED); SetState(pg->leader, STATE_COMPLETED);
} }
@ -407,7 +408,6 @@ void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer,
while (last_writer != leader) { while (last_writer != leader) {
last_writer->status = status; last_writer->status = status;
// we need to read link_older before calling SetState, because as soon // we need to read link_older before calling SetState, because as soon
// as it is marked committed the other thread's Await may return and // as it is marked committed the other thread's Await may return and
// deallocate the Writer. // deallocate the Writer.

View file

@ -13,8 +13,10 @@
#include <mutex> #include <mutex>
#include <vector> #include <vector>
#include <type_traits> #include <type_traits>
#include "db/write_batch_internal.h" #include "db/write_callback.h"
#include "rocksdb/types.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/write_batch.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/instrumented_mutex.h" #include "util/instrumented_mutex.h"
@ -65,6 +67,7 @@ class WriteThread {
struct ParallelGroup { struct ParallelGroup {
Writer* leader; Writer* leader;
Writer* last_writer; Writer* last_writer;
SequenceNumber last_sequence;
bool early_exit_allowed; bool early_exit_allowed;
// before running goes to zero, status needs leader->StateMutex() // before running goes to zero, status needs leader->StateMutex()
Status status; Status status;
@ -77,12 +80,13 @@ class WriteThread {
bool sync; bool sync;
bool disableWAL; bool disableWAL;
bool in_batch_group; bool in_batch_group;
bool has_callback; WriteCallback* callback;
bool made_waitable; // records lazy construction of mutex and cv bool made_waitable; // records lazy construction of mutex and cv
std::atomic<uint8_t> state; // write under StateMutex() or pre-link std::atomic<uint8_t> state; // write under StateMutex() or pre-link
ParallelGroup* parallel_group; ParallelGroup* parallel_group;
SequenceNumber sequence; // the sequence number to use SequenceNumber sequence; // the sequence number to use
Status status; Status status; // status of memtable inserter
Status callback_status; // status returned by callback->Callback()
std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes; std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes; std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
Writer* link_older; // read/write only before linking, or as leader Writer* link_older; // read/write only before linking, or as leader
@ -93,9 +97,10 @@ class WriteThread {
sync(false), sync(false),
disableWAL(false), disableWAL(false),
in_batch_group(false), in_batch_group(false),
has_callback(false), callback(nullptr),
made_waitable(false), made_waitable(false),
state(STATE_INIT), state(STATE_INIT),
parallel_group(nullptr),
link_older(nullptr), link_older(nullptr),
link_newer(nullptr) {} link_newer(nullptr) {}
@ -106,6 +111,13 @@ class WriteThread {
} }
} }
bool CheckCallback(DB* db) {
if (callback != nullptr) {
callback_status = callback->Callback(db);
}
return callback_status.ok();
}
void CreateMutex() { void CreateMutex() {
if (!made_waitable) { if (!made_waitable) {
// Note that made_waitable is tracked separately from state // Note that made_waitable is tracked separately from state
@ -117,6 +129,30 @@ class WriteThread {
} }
} }
// returns the aggregate status of this Writer
Status FinalStatus() {
if (!status.ok()) {
// a non-ok memtable write status takes presidence
assert(callback == nullptr || callback_status.ok());
return status;
} else if (!callback_status.ok()) {
// if the callback failed then that is the status we want
// because a memtable insert should not have been attempted
assert(callback != nullptr);
assert(status.ok());
return callback_status;
} else {
// if there is no callback then we only care about
// the memtable insert status
assert(callback == nullptr || callback_status.ok());
return status;
}
}
bool CallbackFailed() {
return (callback != nullptr) && !callback_status.ok();
}
// No other mutexes may be acquired while holding StateMutex(), it is // No other mutexes may be acquired while holding StateMutex(), it is
// always last in the order // always last in the order
std::mutex& StateMutex() { std::mutex& StateMutex() {
@ -160,8 +196,9 @@ class WriteThread {
// Writer** last_writer: Out-param that identifies the last follower // Writer** last_writer: Out-param that identifies the last follower
// autovector<WriteBatch*>* write_batch_group: Out-param of group members // autovector<WriteBatch*>* write_batch_group: Out-param of group members
// returns: Total batch group byte size // returns: Total batch group byte size
size_t EnterAsBatchGroupLeader(Writer* leader, Writer** last_writer, size_t EnterAsBatchGroupLeader(
autovector<WriteBatch*>* write_batch_group); Writer* leader, Writer** last_writer,
autovector<WriteThread::Writer*>* write_batch_group);
// Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the // Causes JoinBatchGroup to return STATE_PARALLEL_FOLLOWER for all of the
// non-leader members of this write batch group. Sets Writer::sequence // non-leader members of this write batch group. Sets Writer::sequence

View file

@ -71,6 +71,8 @@ class OptimisticTransactionCallback : public WriteCallback {
return txn_->CheckTransactionForConflicts(db); return txn_->CheckTransactionForConflicts(db);
} }
bool AllowWriteBatching() override { return false; }
private: private:
OptimisticTransactionImpl* txn_; OptimisticTransactionImpl* txn_;
}; };

View file

@ -110,6 +110,26 @@ class TransactionImpl : public TransactionBaseImpl {
void operator=(const TransactionImpl&); void operator=(const TransactionImpl&);
}; };
// Used at commit time to check whether transaction is committing before its
// expiration time.
class TransactionCallback : public WriteCallback {
public:
explicit TransactionCallback(TransactionImpl* txn) : txn_(txn) {}
Status Callback(DB* db) override {
if (txn_->IsExpired()) {
return Status::Expired();
} else {
return Status::OK();
}
}
bool AllowWriteBatching() override { return true; }
private:
TransactionImpl* txn_;
};
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE