From 20778f2f9224cf47481cb18f4b02c560df271ecd Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Mon, 10 Apr 2017 12:37:07 -0700 Subject: [PATCH] Adding comments to the write path Summary: also did minor refactoring Closes https://github.com/facebook/rocksdb/pull/2115 Differential Revision: D4855818 Pulled By: maysamyabandeh fbshipit-source-id: fbca6ac57e5c6677fffe8354f7291e596a50cb77 --- db/db_impl_write.cc | 39 +++++++++++++++++++++++++++------------ db/write_batch.cc | 6 +++--- db/write_thread.cc | 17 +++++++++++++++-- db/write_thread.h | 2 +- include/rocksdb/options.h | 2 ++ 5 files changed, 48 insertions(+), 18 deletions(-) diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index d0755ea226..b2f4c7e3a4 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -117,7 +117,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // when it finds suitable, and finish them in the same write batch. // This is how a write job could be done by the other writer. WriteContext write_context; - WriteThread::Writer* last_writer = &w; + WriteThread::Writer* last_writer = &w; // Dummy intial value autovector write_group; bool logs_getting_synced = false; @@ -127,7 +127,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool need_log_dir_sync = need_log_sync && !log_dir_synced_; status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced, &write_context); - uint64_t last_sequence = versions_->LastSequence(); log::Writer* cur_log_writer = logs_.back().writer; mutex_.Unlock(); @@ -145,15 +144,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // Rules for when we can update the memtable concurrently // 1. supported by memtable // 2. Puts are not okay if inplace_update_support - // 3. Deletes or SingleDeletes are not okay if filtering deletes - // (controlled by both batch and memtable setting) - // 4. Merges are not okay + // 3. Merges are not okay // - // Rules 1..3 are enforced by checking the options + // Rules 1..2 are enforced by checking the options // during startup (CheckConcurrentWritesSupported), so if // options.allow_concurrent_memtable_write is true then they can be - // assumed to be true. Rule 4 is checked for each batch. We could - // relax rules 2 and 3 if we could prevent write batches from referring + // assumed to be true. Rule 3 is checked for each batch. We could + // relax rules 2 if we could prevent write batches from referring // more than once to a particular key. bool parallel = immutable_db_options_.allow_concurrent_memtable_write && write_group.size() > 1; @@ -173,6 +170,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } + uint64_t last_sequence = versions_->LastSequence(); const SequenceNumber current_sequence = last_sequence + 1; last_sequence += total_count; @@ -218,7 +216,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, - 0 /*log_number*/, this); + 0 /*recovery_log_number*/, this); if (status.ok()) { // There were no write failures. Set leader's status @@ -236,8 +234,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, std::memory_order_relaxed); write_thread_.LaunchParallelFollowers(&pg, current_sequence); + // Each parallel follower is doing each own writes. The leader should + // also do its own. if (w.ShouldWriteToMemtable()) { - // do leader write ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); assert(w.sequence == current_sequence); @@ -270,8 +269,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. - if (!status.ok() && bg_error_.ok() && !w.CallbackFailed()) { - bg_error_ = status; + if (!status.ok() && !w.CallbackFailed()) { + mutex_.Lock(); + if (bg_error_.ok()) { + bg_error_ = status; // stop compaction & fail any further writes + } + mutex_.Unlock(); } } } @@ -341,11 +344,23 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, } if (status.ok() && need_log_sync) { + // Wait until the parallel syncs are finished. Any sync process has to sync + // the front log too so it is enough to check the status of front() + // We do a while loop since log_sync_cv_ is signalled when any sync is + // finished + // Note: there does not seem to be a reason to wait for parallel sync at + // this early step but it is not important since parallel sync (SyncWAL) and + // need_log_sync are usually not used together. while (logs_.front().getting_synced) { log_sync_cv_.Wait(); } for (auto& log : logs_) { assert(!log.getting_synced); + // This is just to prevent the logs to be synced by a parallel SyncWAL + // call. We will do the actual syncing later after we will write to the + // WAL. + // Note: there does not seem to be a reason to set this early before we + // actually write to the WAL log.getting_synced = true; } *logs_getting_synced = true; diff --git a/db/write_batch.cc b/db/write_batch.cc index 2270b1ef35..cc295791d5 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -1243,11 +1243,11 @@ public: Status WriteBatchInternal::InsertInto( const autovector& writers, SequenceNumber sequence, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, uint64_t log_number, DB* db, + bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db, bool concurrent_memtable_writes) { MemTableInserter inserter(sequence, memtables, flush_scheduler, - ignore_missing_column_families, log_number, db, - concurrent_memtable_writes); + ignore_missing_column_families, recovery_log_number, + db, concurrent_memtable_writes); for (size_t i = 0; i < writers.size(); i++) { auto w = writers[i]; if (!w->ShouldWriteToMemtable()) { diff --git a/db/write_thread.cc b/db/write_thread.cc index 02943fae5a..26836ef6df 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -5,7 +5,6 @@ #include "db/write_thread.h" #include -#include #include #include "db/column_family.h" #include "port/port.h" @@ -195,7 +194,9 @@ void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) { // debugging and is checked by an assert in WriteImpl w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed); } + // Then we are the head of the queue and hence definiltly the leader *linked_as_leader = (writers == nullptr); + // Otherwise we will wait for previous leader to define our status return; } } @@ -223,6 +224,13 @@ void WriteThread::JoinBatchGroup(Writer* w) { TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); if (!linked_as_leader) { + /** + * Wait util: + * 1) An existing leader pick us as the new leader when it finishes + * 2) An exisitng leader pick us as its follewer and + * 2.1) finishes the memtable writes on our behalf + * 2.2) Or tell us to finish the memtable writes it in pralallel + */ AwaitState(w, STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED, &ctx); @@ -316,19 +324,22 @@ void WriteThread::LaunchParallelFollowers(ParallelGroup* pg, Writer* w = pg->leader; w->sequence = sequence; + // Initialize and wake up the others while (w != pg->last_writer) { // Writers that won't write don't get sequence allotment if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) { + // There is a sequence number of each written key sequence += WriteBatchInternal::Count(w->batch); } w = w->link_newer; - w->sequence = sequence; + w->sequence = sequence; // sequence number for the first key in the batch w->parallel_group = pg; SetState(w, STATE_PARALLEL_FOLLOWER); } } +// This method is called by both the leader and parallel followers bool WriteThread::CompleteParallelWorker(Writer* w) { static AdaptationContext ctx("CompleteParallelWorker"); @@ -352,6 +363,7 @@ bool WriteThread::CompleteParallelWorker(Writer* w) { } // else we're the last parallel worker + // Errors (if there is any) must be handled by leader before waking up others if (w == leader || (early_exit_allowed && pg->status.ok())) { // this thread should perform exit duties w->status = pg->status; @@ -434,6 +446,7 @@ void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { if (!linked_as_leader) { mu->Unlock(); TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); + // Last leader will not pick us as a follower since our batch is nullptr AwaitState(w, STATE_GROUP_LEADER, &ctx); mu->Lock(); } diff --git a/db/write_thread.h b/db/write_thread.h index 0c3f6fcbc6..cb7b668078 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -90,7 +90,7 @@ class WriteThread { bool made_waitable; // records lazy construction of mutex and cv std::atomic state; // write under StateMutex() or pre-link ParallelGroup* parallel_group; - SequenceNumber sequence; // the sequence number to use + SequenceNumber sequence; // the sequence number to use for the first key Status status; // status of memtable inserter Status callback_status; // status returned by callback->Callback() std::aligned_storage::type state_mutex_bytes; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 60ccace65c..b0ee950a6e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -412,6 +412,8 @@ struct DBOptions { // This parameter should be set to true while storing data to // filesystem like ext3 that can lose files after a reboot. // Default: false + // Note: on many platforms fdatasync is defined as fsync, so this parameter + // would make no difference. Refer to fdatasync definition in this code base. bool use_fsync = false; // A list of paths where SST files can be put into, with its target size.