Add public API `WriteWithCallback` to support custom callbacks (#12603)

Summary:
This PR adds a `DB::WriteWithCallback` API that does the same things as `DB::Write` while takes an argument `UserWriteCallback` to execute custom callback functions during the write.

We currently support two types of callback functions: `OnWriteEnqueued` and `OnWalWriteFinish`. The former is invoked   after the write is enqueued, and the later is invoked after WAL write finishes when applicable.

These callback functions are intended for users to use to improve synchronization between concurrent writes, their execution is on the write's critical path so it will impact the write's latency if not used properly. The documentation for the callback interface mentioned this and suggest user to keep these callback functions' implementation minimum.

Although transaction interfaces' writes doesn't yet allow user to specify such a user write callback argument, the `DBImpl::Write*` type of APIs do not differentiate between regular DB writes or writes coming from the transaction layer when it comes to supporting this `UserWriteCallback`. These callbacks works for all the write modes including: default write mode, Options.two_write_queues, Options.unordered_write, Options.enable_pipelined_write

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12603

Test Plan: Added unit test in ./write_callback_test

Reviewed By: anand1976

Differential Revision: D58044638

Pulled By: jowlyzhang

fbshipit-source-id: 87a84a0221df8f589ec8fc4d74597e72ce97e4cd
This commit is contained in:
Yu Zhang 2024-05-31 19:30:19 -07:00 committed by Facebook GitHub Bot
parent f3b7e959b3
commit fc59d8f9c6
12 changed files with 210 additions and 65 deletions

View File

@ -57,6 +57,7 @@
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/trace_reader_writer.h" #include "rocksdb/trace_reader_writer.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "rocksdb/user_write_callback.h"
#include "rocksdb/utilities/replayer.h" #include "rocksdb/utilities/replayer.h"
#include "rocksdb/write_buffer_manager.h" #include "rocksdb/write_buffer_manager.h"
#include "table/merging_iterator.h" #include "table/merging_iterator.h"
@ -231,6 +232,10 @@ class DBImpl : public DB {
using DB::Write; using DB::Write;
Status Write(const WriteOptions& options, WriteBatch* updates) override; Status Write(const WriteOptions& options, WriteBatch* updates) override;
using DB::WriteWithCallback;
Status WriteWithCallback(const WriteOptions& options, WriteBatch* updates,
UserWriteCallback* user_write_cb) override;
using DB::Get; using DB::Get;
Status Get(const ReadOptions& _read_options, Status Get(const ReadOptions& _read_options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
@ -688,7 +693,8 @@ class DBImpl : public DB {
// thread to determine whether it is safe to perform the write. // thread to determine whether it is safe to perform the write.
virtual Status WriteWithCallback(const WriteOptions& write_options, virtual Status WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch, WriteBatch* my_batch,
WriteCallback* callback); WriteCallback* callback,
UserWriteCallback* user_write_cb = nullptr);
// Returns the sequence number that is guaranteed to be smaller than or equal // Returns the sequence number that is guaranteed to be smaller than or equal
// to the sequence number of any key that could be inserted into the current // to the sequence number of any key that could be inserted into the current
@ -1497,6 +1503,7 @@ class DBImpl : public DB {
// batch that does not have duplicate keys. // batch that does not have duplicate keys.
Status WriteImpl(const WriteOptions& options, WriteBatch* updates, Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr, WriteCallback* callback = nullptr,
UserWriteCallback* user_write_cb = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0, uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false, uint64_t* seq_used = nullptr, bool disable_memtable = false, uint64_t* seq_used = nullptr,
size_t batch_cnt = 0, size_t batch_cnt = 0,
@ -1505,6 +1512,7 @@ class DBImpl : public DB {
Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr, WriteCallback* callback = nullptr,
UserWriteCallback* user_write_cb = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0, uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false, bool disable_memtable = false,
uint64_t* seq_used = nullptr); uint64_t* seq_used = nullptr);
@ -1531,7 +1539,8 @@ class DBImpl : public DB {
// marks start of a new sub-batch. // marks start of a new sub-batch.
Status WriteImplWALOnly( Status WriteImplWALOnly(
WriteThread* write_thread, const WriteOptions& options, WriteThread* write_thread, const WriteOptions& options,
WriteBatch* updates, WriteCallback* callback, uint64_t* log_used, WriteBatch* updates, WriteCallback* callback,
UserWriteCallback* user_write_cb, uint64_t* log_used,
const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
const PublishLastSeq publish_last_seq, const bool disable_memtable); const PublishLastSeq publish_last_seq, const bool disable_memtable);

View File

@ -155,21 +155,36 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
} }
if (s.ok()) { if (s.ok()) {
s = WriteImpl(write_options, my_batch, /*callback=*/nullptr, s = WriteImpl(write_options, my_batch, /*callback=*/nullptr,
/*user_write_cb=*/nullptr,
/*log_used=*/nullptr); /*log_used=*/nullptr);
} }
return s; return s;
} }
Status DBImpl::WriteWithCallback(const WriteOptions& write_options, Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch, WriteBatch* my_batch, WriteCallback* callback,
WriteCallback* callback) { UserWriteCallback* user_write_cb) {
Status s; Status s;
if (write_options.protection_bytes_per_key > 0) { if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo( s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key); my_batch, write_options.protection_bytes_per_key);
} }
if (s.ok()) { if (s.ok()) {
s = WriteImpl(write_options, my_batch, callback, nullptr); s = WriteImpl(write_options, my_batch, callback, user_write_cb);
}
return s;
}
Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch,
UserWriteCallback* user_write_cb) {
Status s;
if (write_options.protection_bytes_per_key > 0) {
s = WriteBatchInternal::UpdateProtectionInfo(
my_batch, write_options.protection_bytes_per_key);
}
if (s.ok()) {
s = WriteImpl(write_options, my_batch, /*callback=*/nullptr, user_write_cb);
} }
return s; return s;
} }
@ -179,9 +194,9 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
// published sequence. // published sequence.
Status DBImpl::WriteImpl(const WriteOptions& write_options, Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback, WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref, UserWriteCallback* user_write_cb, uint64_t* log_used,
bool disable_memtable, uint64_t* seq_used, uint64_t log_ref, bool disable_memtable,
size_t batch_cnt, uint64_t* seq_used, size_t batch_cnt,
PreReleaseCallback* pre_release_callback, PreReleaseCallback* pre_release_callback,
PostMemTableCallback* post_memtable_callback) { PostMemTableCallback* post_memtable_callback) {
assert(!seq_per_batch_ || batch_cnt != 0); assert(!seq_per_batch_ || batch_cnt != 0);
@ -288,10 +303,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder; seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder;
// Otherwise it is WAL-only Prepare batches in WriteCommitted policy and // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and
// they don't consume sequence. // they don't consume sequence.
return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch, return WriteImplWALOnly(
callback, log_used, log_ref, seq_used, batch_cnt, &nonmem_write_thread_, write_options, my_batch, callback, user_write_cb,
pre_release_callback, assign_order, log_used, log_ref, seq_used, batch_cnt, pre_release_callback,
kDontPublishLastSeq, disable_memtable); assign_order, kDontPublishLastSeq, disable_memtable);
} }
if (immutable_db_options_.unordered_write) { if (immutable_db_options_.unordered_write) {
@ -303,9 +318,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// Use a write thread to i) optimize for WAL write, ii) publish last // Use a write thread to i) optimize for WAL write, ii) publish last
// sequence in in increasing order, iii) call pre_release_callback serially // sequence in in increasing order, iii) call pre_release_callback serially
Status status = WriteImplWALOnly( Status status = WriteImplWALOnly(
&write_thread_, write_options, my_batch, callback, log_used, log_ref, &write_thread_, write_options, my_batch, callback, user_write_cb,
&seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder, log_used, log_ref, &seq, sub_batch_cnt, pre_release_callback,
kDoPublishLastSeq, disable_memtable); kDoAssignOrder, kDoPublishLastSeq, disable_memtable);
TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL"); TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
if (!status.ok()) { if (!status.ok()) {
return status; return status;
@ -322,14 +337,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
if (immutable_db_options_.enable_pipelined_write) { if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used, return PipelinedWriteImpl(write_options, my_batch, callback, user_write_cb,
log_ref, disable_memtable, seq_used); log_used, log_ref, disable_memtable, seq_used);
} }
PERF_TIMER_GUARD(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref, WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,
disable_memtable, batch_cnt, pre_release_callback, log_ref, disable_memtable, batch_cnt,
post_memtable_callback); pre_release_callback, post_memtable_callback);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);
write_thread_.JoinBatchGroup(&w); write_thread_.JoinBatchGroup(&w);
@ -686,6 +701,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback, WriteBatch* my_batch, WriteCallback* callback,
UserWriteCallback* user_write_cb,
uint64_t* log_used, uint64_t log_ref, uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) { bool disable_memtable, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_pre_and_post_process_time);
@ -693,8 +709,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
WriteContext write_context; WriteContext write_context;
WriteThread::Writer w(write_options, my_batch, callback, log_ref, WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,
disable_memtable, /*_batch_cnt=*/0, log_ref, disable_memtable, /*_batch_cnt=*/0,
/*_pre_release_callback=*/nullptr); /*_pre_release_callback=*/nullptr);
write_thread_.JoinBatchGroup(&w); write_thread_.JoinBatchGroup(&w);
TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"); TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup");
@ -875,7 +891,8 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);
WriteThread::Writer w(write_options, my_batch, callback, log_ref, WriteThread::Writer w(write_options, my_batch, callback,
/*user_write_cb=*/nullptr, log_ref,
false /*disable_memtable*/); false /*disable_memtable*/);
if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) { if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) {
@ -925,13 +942,15 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
// applicable in a two-queue setting. // applicable in a two-queue setting.
Status DBImpl::WriteImplWALOnly( Status DBImpl::WriteImplWALOnly(
WriteThread* write_thread, const WriteOptions& write_options, WriteThread* write_thread, const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, WriteBatch* my_batch, WriteCallback* callback,
UserWriteCallback* user_write_cb, uint64_t* log_used,
const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
const PublishLastSeq publish_last_seq, const bool disable_memtable) { const PublishLastSeq publish_last_seq, const bool disable_memtable) {
PERF_TIMER_GUARD(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref, WriteThread::Writer w(write_options, my_batch, callback, user_write_cb,
disable_memtable, sub_batch_cnt, pre_release_callback); log_ref, disable_memtable, sub_batch_cnt,
pre_release_callback);
StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE);
write_thread->JoinBatchGroup(&w); write_thread->JoinBatchGroup(&w);
@ -1498,6 +1517,11 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
RecordTick(stats_, WAL_FILE_BYTES, log_size); RecordTick(stats_, WAL_FILE_BYTES, log_size);
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal); stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
for (auto* writer : write_group) {
if (!writer->CallbackFailed()) {
writer->CheckPostWalWriteCallback();
}
}
} }
return io_s; return io_s;
} }
@ -1562,6 +1586,11 @@ IOStatus DBImpl::ConcurrentWriteToWAL(
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal, stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal,
concurrent); concurrent);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
for (auto* writer : write_group) {
if (!writer->CallbackFailed()) {
writer->CheckPostWalWriteCallback();
}
}
} }
return io_s; return io_s;
} }

View File

@ -2,8 +2,6 @@
// This source code is licensed under both the GPLv2 (found in the // This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License // COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#include "db/write_callback.h" #include "db/write_callback.h"
#include <atomic> #include <atomic>
@ -15,6 +13,7 @@
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/user_write_callback.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
@ -84,6 +83,28 @@ class MockWriteCallback : public WriteCallback {
bool AllowWriteBatching() override { return allow_batching_; } bool AllowWriteBatching() override { return allow_batching_; }
}; };
class MockUserWriteCallback : public UserWriteCallback {
public:
std::atomic<bool> write_enqueued_{false};
std::atomic<bool> wal_write_done_{false};
MockUserWriteCallback() = default;
MockUserWriteCallback(const MockUserWriteCallback& other) {
write_enqueued_.store(other.write_enqueued_.load());
wal_write_done_.store(other.wal_write_done_.load());
}
void OnWriteEnqueued() override { write_enqueued_.store(true); }
void OnWalWriteFinish() override { wal_write_done_.store(true); }
void Reset() {
write_enqueued_.store(false);
wal_write_done_.store(false);
}
};
#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) #if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
class WriteCallbackPTest class WriteCallbackPTest
: public WriteCallbackTest, : public WriteCallbackTest,
@ -119,9 +140,11 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
kvs_.clear(); kvs_.clear();
write_batch_.Clear(); write_batch_.Clear();
callback_.was_called_.store(false); callback_.was_called_.store(false);
user_write_cb_.Reset();
} }
MockWriteCallback callback_; MockWriteCallback callback_;
MockUserWriteCallback user_write_cb_;
WriteBatch write_batch_; WriteBatch write_batch_;
std::vector<std::pair<string, string>> kvs_; std::vector<std::pair<string, string>> kvs_;
}; };
@ -327,18 +350,26 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
ASSERT_OK(WriteBatchInternal::InsertNoop(&write_op.write_batch_)); ASSERT_OK(WriteBatchInternal::InsertNoop(&write_op.write_batch_));
const size_t ONE_BATCH = 1; const size_t ONE_BATCH = 1;
s = db_impl->WriteImpl(woptions, &write_op.write_batch_, s = db_impl->WriteImpl(woptions, &write_op.write_batch_,
&write_op.callback_, nullptr, 0, false, nullptr, &write_op.callback_, &write_op.user_write_cb_,
ONE_BATCH, nullptr, 0, false, nullptr, ONE_BATCH,
two_queues_ ? &publish_seq_callback : nullptr); two_queues_ ? &publish_seq_callback : nullptr);
} else { } else {
s = db_impl->WriteWithCallback(woptions, &write_op.write_batch_, s = db_impl->WriteWithCallback(woptions, &write_op.write_batch_,
&write_op.callback_); &write_op.callback_,
&write_op.user_write_cb_);
} }
ASSERT_TRUE(write_op.user_write_cb_.write_enqueued_.load());
if (write_op.callback_.should_fail_) { if (write_op.callback_.should_fail_) {
ASSERT_TRUE(s.IsBusy()); ASSERT_TRUE(s.IsBusy());
ASSERT_FALSE(write_op.user_write_cb_.wal_write_done_.load());
} else { } else {
ASSERT_OK(s); ASSERT_OK(s);
if (enable_WAL_) {
ASSERT_TRUE(write_op.user_write_cb_.wal_write_done_.load());
} else {
ASSERT_FALSE(write_op.user_write_cb_.wal_write_done_.load());
}
} }
}; };
@ -440,6 +471,16 @@ TEST_F(WriteCallbackTest, WriteCallBackTest) {
ASSERT_OK(s); ASSERT_OK(s);
ASSERT_EQ("value.a2", value); ASSERT_EQ("value.a2", value);
MockUserWriteCallback user_write_cb;
WriteBatch wb4;
ASSERT_OK(wb4.Put("a", "value.a4"));
ASSERT_OK(db->WriteWithCallback(write_options, &wb4, &user_write_cb));
ASSERT_OK(db->Get(read_options, "a", &value));
ASSERT_EQ(value, "value.a4");
ASSERT_TRUE(user_write_cb.write_enqueued_.load());
ASSERT_TRUE(user_write_cb.wal_write_done_.load());
delete db; delete db;
ASSERT_OK(DestroyDB(dbname, options)); ASSERT_OK(DestroyDB(dbname, options));
} }

View File

@ -404,6 +404,8 @@ void WriteThread::JoinBatchGroup(Writer* w) {
bool linked_as_leader = LinkOne(w, &newest_writer_); bool linked_as_leader = LinkOne(w, &newest_writer_);
w->CheckWriteEnqueuedCallback();
if (linked_as_leader) { if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER); SetState(w, STATE_GROUP_LEADER);
} }

View File

@ -22,6 +22,7 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "rocksdb/user_write_callback.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "util/aligned_storage.h" #include "util/aligned_storage.h"
#include "util/autovector.h" #include "util/autovector.h"
@ -134,6 +135,7 @@ class WriteThread {
uint64_t log_used; // log number that this batch was inserted into uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference uint64_t log_ref; // log number that memtable insert should reference
WriteCallback* callback; WriteCallback* callback;
UserWriteCallback* user_write_cb;
bool made_waitable; // records lazy construction of mutex and cv bool made_waitable; // records lazy construction of mutex and cv
std::atomic<uint8_t> state; // write under StateMutex() or pre-link std::atomic<uint8_t> state; // write under StateMutex() or pre-link
WriteGroup* write_group; WriteGroup* write_group;
@ -160,6 +162,7 @@ class WriteThread {
log_used(0), log_used(0),
log_ref(0), log_ref(0),
callback(nullptr), callback(nullptr),
user_write_cb(nullptr),
made_waitable(false), made_waitable(false),
state(STATE_INIT), state(STATE_INIT),
write_group(nullptr), write_group(nullptr),
@ -168,8 +171,8 @@ class WriteThread {
link_newer(nullptr) {} link_newer(nullptr) {}
Writer(const WriteOptions& write_options, WriteBatch* _batch, Writer(const WriteOptions& write_options, WriteBatch* _batch,
WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable, WriteCallback* _callback, UserWriteCallback* _user_write_cb,
size_t _batch_cnt = 0, uint64_t _log_ref, bool _disable_memtable, size_t _batch_cnt = 0,
PreReleaseCallback* _pre_release_callback = nullptr, PreReleaseCallback* _pre_release_callback = nullptr,
PostMemTableCallback* _post_memtable_callback = nullptr) PostMemTableCallback* _post_memtable_callback = nullptr)
: batch(_batch), : batch(_batch),
@ -187,6 +190,7 @@ class WriteThread {
log_used(0), log_used(0),
log_ref(_log_ref), log_ref(_log_ref),
callback(_callback), callback(_callback),
user_write_cb(_user_write_cb),
made_waitable(false), made_waitable(false),
state(STATE_INIT), state(STATE_INIT),
write_group(nullptr), write_group(nullptr),
@ -210,6 +214,18 @@ class WriteThread {
return callback_status.ok(); return callback_status.ok();
} }
void CheckWriteEnqueuedCallback() {
if (user_write_cb != nullptr) {
user_write_cb->OnWriteEnqueued();
}
}
void CheckPostWalWriteCallback() {
if (user_write_cb != nullptr) {
user_write_cb->OnWalWriteFinish();
}
}
void CreateMutex() { void CreateMutex() {
if (!made_waitable) { if (!made_waitable) {
// Note that made_waitable is tracked separately from state // Note that made_waitable is tracked separately from state

View File

@ -28,6 +28,7 @@
#include "rocksdb/thread_status.h" #include "rocksdb/thread_status.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "rocksdb/user_write_callback.h"
#include "rocksdb/version.h" #include "rocksdb/version.h"
#include "rocksdb/wide_columns.h" #include "rocksdb/wide_columns.h"
@ -583,6 +584,15 @@ class DB {
// Note: consider setting options.sync = true. // Note: consider setting options.sync = true.
virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0; virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;
// Same as DB::Write, and takes a `UserWriteCallback` argument to allow
// users to plug in custom logic in callback functions during the write.
virtual Status WriteWithCallback(const WriteOptions& /*options*/,
WriteBatch* /*updates*/,
UserWriteCallback* /*user_write_cb*/) {
return Status::NotSupported(
"WriteWithCallback not implemented for this interface.");
}
// If the column family specified by "column_family" contains an entry for // If the column family specified by "column_family" contains an entry for
// "key", return the corresponding value in "*value". If the entry is a plain // "key", return the corresponding value in "*value". If the entry is a plain
// key-value, return the value as-is; if it is a wide-column entity, return // key-value, return the value as-is; if it is a wide-column entity, return

View File

@ -0,0 +1,29 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
// Custom callback functions to support users to plug in logic while data is
// being written to the DB. It's intended for better synchronization between
// concurrent writes. Note that these callbacks are in the write's critical path
// It's desirable to keep them fast and minimum to not affect the write's
// latency. These callbacks may be called in the context of a different thread.
class UserWriteCallback {
public:
virtual ~UserWriteCallback() {}
// This function will be called after the write is enqueued.
virtual void OnWriteEnqueued() = 0;
// This function will be called after wal write finishes if it applies.
virtual void OnWalWriteFinish() = 0;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -638,9 +638,9 @@ Status WriteCommittedTxn::PrepareInternal() {
SequenceNumber* const KIgnoreSeqUsed = nullptr; SequenceNumber* const KIgnoreSeqUsed = nullptr;
const size_t kNoBatchCount = 0; const size_t kNoBatchCount = 0;
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
kNoWriteCallback, &log_number_, kRefNoLog, kNoWriteCallback, /*user_write_cb=*/nullptr,
kDisableMemtable, KIgnoreSeqUsed, kNoBatchCount, &log_number_, kRefNoLog, kDisableMemtable,
&mark_log_callback); KIgnoreSeqUsed, kNoBatchCount, &mark_log_callback);
return s; return s;
} }
@ -773,10 +773,10 @@ Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
post_mem_cb = &snapshot_creation_cb; post_mem_cb = &snapshot_creation_cb;
} }
} }
auto s = db_impl_->WriteImpl(write_options_, wb, auto s = db_impl_->WriteImpl(
/*callback*/ nullptr, /*log_used*/ nullptr, write_options_, wb,
/*log_ref*/ 0, /*disable_memtable*/ false, /*callback*/ nullptr, /*user_write_cb=*/nullptr, /*log_used*/ nullptr,
&seq_used, /*batch_cnt=*/0, /*log_ref*/ 0, /*disable_memtable*/ false, &seq_used, /*batch_cnt=*/0,
/*pre_release_callback=*/nullptr, post_mem_cb); /*pre_release_callback=*/nullptr, post_mem_cb);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (s.ok()) { if (s.ok()) {
@ -788,6 +788,7 @@ Status WriteCommittedTxn::CommitWithoutPrepareInternal() {
Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) { Status WriteCommittedTxn::CommitBatchInternal(WriteBatch* batch, size_t) {
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr, auto s = db_impl_->WriteImpl(write_options_, batch, /*callback*/ nullptr,
/*user_write_cb=*/nullptr,
/*log_used*/ nullptr, /*log_ref*/ 0, /*log_used*/ nullptr, /*log_ref*/ 0,
/*disable_memtable*/ false, &seq_used); /*disable_memtable*/ false, &seq_used);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
@ -861,6 +862,7 @@ Status WriteCommittedTxn::CommitInternal() {
} }
} }
s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr, s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr,
/*user_write_cb=*/nullptr,
/*log_used*/ nullptr, /*log_ref*/ log_number_, /*log_used*/ nullptr, /*log_ref*/ log_number_,
/*disable_memtable*/ false, &seq_used, /*disable_memtable*/ false, &seq_used,
/*batch_cnt=*/0, /*pre_release_callback=*/nullptr, /*batch_cnt=*/0, /*pre_release_callback=*/nullptr,

View File

@ -154,8 +154,9 @@ Status WritePreparedTxn::PrepareInternal() {
const bool DISABLE_MEMTABLE = true; const bool DISABLE_MEMTABLE = true;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0, /*callback*/ nullptr, /*user_write_cb=*/nullptr,
!DISABLE_MEMTABLE, &seq_used, prepare_batch_cnt_, &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
&seq_used, prepare_batch_cnt_,
&add_prepared_callback); &add_prepared_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
auto prepare_seq = seq_used; auto prepare_seq = seq_used;
@ -247,9 +248,10 @@ Status WritePreparedTxn::CommitInternal() {
// TransactionOptions::use_only_the_last_commit_time_batch_for_recovery to // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery to
// true. See the comments about GetCommitTimeWriteBatch() in // true. See the comments about GetCommitTimeWriteBatch() in
// include/rocksdb/utilities/transaction.h. // include/rocksdb/utilities/transaction.h.
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, working_batch, nullptr,
zero_log_number, disable_memtable, &seq_used, /*user_write_cb=*/nullptr, nullptr, zero_log_number,
batch_cnt, pre_release_callback); disable_memtable, &seq_used, batch_cnt,
pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used; const SequenceNumber commit_batch_seq = seq_used;
if (LIKELY(do_one_write || !s.ok())) { if (LIKELY(do_one_write || !s.ok())) {
@ -284,8 +286,9 @@ Status WritePreparedTxn::CommitInternal() {
const bool DISABLE_MEMTABLE = true; const bool DISABLE_MEMTABLE = true;
const size_t ONE_BATCH = 1; const size_t ONE_BATCH = 1;
const uint64_t NO_REF_LOG = 0; const uint64_t NO_REF_LOG = 0;
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG,
DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_aux_batch); &update_commit_map_with_aux_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
return s; return s;
@ -450,8 +453,9 @@ Status WritePreparedTxn::RollbackInternal() {
// DB in one shot. min_uncommitted still works since it requires capturing // DB in one shot. min_uncommitted still works since it requires capturing
// data that is written to DB but not yet committed, while // data that is written to DB but not yet committed, while
// the rollback batch commits with PreReleaseCallback. // the rollback batch commits with PreReleaseCallback.
s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr,
NO_REF_LOG, !DISABLE_MEMTABLE, &seq_used, ONE_BATCH, /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG,
!DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
pre_release_callback); pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) { if (!s.ok()) {
@ -476,8 +480,9 @@ Status WritePreparedTxn::RollbackInternal() {
// In the absence of Prepare markers, use Noop as a batch separator // In the absence of Prepare markers, use Noop as a batch separator
s = WriteBatchInternal::InsertNoop(&empty_batch); s = WriteBatchInternal::InsertNoop(&empty_batch);
assert(s.ok()); assert(s.ok());
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG,
DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
&update_commit_map_with_prepare); &update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log, ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,

View File

@ -213,8 +213,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
} else { } else {
pre_release_callback = &add_prepared_callback; pre_release_callback = &add_prepared_callback;
} }
s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, no_log_ref, s = db_impl_->WriteImpl(write_options, batch, nullptr, nullptr, nullptr,
!DISABLE_MEMTABLE, &seq_used, batch_cnt, no_log_ref, !DISABLE_MEMTABLE, &seq_used, batch_cnt,
pre_release_callback); pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
uint64_t prepare_seq = seq_used; uint64_t prepare_seq = seq_used;
@ -240,8 +240,8 @@ Status WritePreparedTxnDB::WriteInternal(const WriteOptions& write_options_orig,
write_options.sync = false; write_options.sync = false;
const size_t ONE_BATCH = 1; // Just to inc the seq const size_t ONE_BATCH = 1; // Just to inc the seq
s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options, &empty_batch, nullptr, nullptr,
no_log_ref, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, nullptr, no_log_ref, DISABLE_MEMTABLE, &seq_used,
&update_commit_map_with_prepare); ONE_BATCH, &update_commit_map_with_prepare);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note: RemovePrepared is called from within PreReleaseCallback // Note: RemovePrepared is called from within PreReleaseCallback
return s; return s;

View File

@ -378,7 +378,8 @@ Status WriteUnpreparedTxn::FlushWriteBatchToDBInternal(bool prepared) {
// WriteImpl should not overwrite that value, so set log_used to nullptr if // WriteImpl should not overwrite that value, so set log_used to nullptr if
// log_number_ is already set. // log_number_ is already set.
s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(), s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &last_log_number_, /*callback*/ nullptr, /*user_write_cb=*/nullptr,
&last_log_number_,
/*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used, /*log ref*/ 0, !DISABLE_MEMTABLE, &seq_used,
prepare_batch_cnt_, &add_prepared_callback); prepare_batch_cnt_, &add_prepared_callback);
if (log_number_ == 0) { if (log_number_ == 0) {
@ -595,7 +596,7 @@ Status WriteUnpreparedTxn::CommitInternal() {
const uint64_t zero_log_number = 0ull; const uint64_t zero_log_number = 0ull;
size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1; size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
zero_log_number, disable_memtable, &seq_used, nullptr, zero_log_number, disable_memtable, &seq_used,
batch_cnt, pre_release_callback); batch_cnt, pre_release_callback);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
const SequenceNumber commit_batch_seq = seq_used; const SequenceNumber commit_batch_seq = seq_used;
@ -639,8 +640,8 @@ Status WriteUnpreparedTxn::CommitInternal() {
const size_t ONE_BATCH = 1; const size_t ONE_BATCH = 1;
const uint64_t NO_REF_LOG = 0; const uint64_t NO_REF_LOG = 0;
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used,
&update_commit_map_with_commit_batch); ONE_BATCH, &update_commit_map_with_commit_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Note RemovePrepared should be called after WriteImpl that publishsed the // Note RemovePrepared should be called after WriteImpl that publishsed the
// seq. Otherwise SmallestUnCommittedSeq optimization breaks. // seq. Otherwise SmallestUnCommittedSeq optimization breaks.
@ -771,8 +772,8 @@ Status WriteUnpreparedTxn::RollbackInternal() {
// data that is written to DB but not yet committed, while the rollback // data that is written to DB but not yet committed, while the rollback
// batch commits with PreReleaseCallback. // batch commits with PreReleaseCallback.
s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(), s = db_impl_->WriteImpl(write_options_, rollback_batch.GetWriteBatch(),
nullptr, nullptr, NO_REF_LOG, !DISABLE_MEMTABLE, nullptr, nullptr, nullptr, NO_REF_LOG,
&seq_used, rollback_batch_cnt, !DISABLE_MEMTABLE, &seq_used, rollback_batch_cnt,
do_one_write ? &update_commit_map : nullptr); do_one_write ? &update_commit_map : nullptr);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
if (!s.ok()) { if (!s.ok()) {
@ -807,8 +808,8 @@ Status WriteUnpreparedTxn::RollbackInternal() {
s = WriteBatchInternal::InsertNoop(&empty_batch); s = WriteBatchInternal::InsertNoop(&empty_batch);
assert(s.ok()); assert(s.ok());
s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr, nullptr,
NO_REF_LOG, DISABLE_MEMTABLE, &seq_used, ONE_BATCH, nullptr, NO_REF_LOG, DISABLE_MEMTABLE, &seq_used,
&update_commit_map_with_rollback_batch); ONE_BATCH, &update_commit_map_with_rollback_batch);
assert(!s.ok() || seq_used != kMaxSequenceNumber); assert(!s.ok() || seq_used != kMaxSequenceNumber);
// Mark the txn as rolled back // Mark the txn as rolled back
if (s.ok()) { if (s.ok()) {

View File

@ -180,7 +180,8 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
const size_t kOneBatch = 1; const size_t kOneBatch = 1;
uint64_t seq_used = kMaxSequenceNumber; uint64_t seq_used = kMaxSequenceNumber;
s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr, s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
kNoLogRef, !kDisableMemtable, &seq_used, kOneBatch); nullptr, kNoLogRef, !kDisableMemtable, &seq_used,
kOneBatch);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }