diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index ca227c99e8..c8a18562fe 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1098,8 +1098,10 @@ class DBImpl : public DB { // Called from WriteBufferManager. This function changes the state_ // to State::RUNNING indicating the stall is cleared and DB can proceed. void Signal() override { - MutexLock lock(&state_mutex_); - state_ = State::RUNNING; + { + MutexLock lock(&state_mutex_); + state_ = State::RUNNING; + } state_cv_.Signal(); } diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index add957d847..a5471aabd2 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -85,9 +85,7 @@ class WriteBufferManager { buffer_size_.store(new_size, std::memory_order_relaxed); mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed); // Check if stall is active and can be ended. - if (allow_stall_) { - EndWriteStall(); - } + MaybeEndWriteStall(); } // Below functions should be called by RocksDB internally. @@ -118,17 +116,12 @@ class WriteBufferManager { // pass allow_stall = true during WriteBufferManager instance creation. // // Should only be called by RocksDB internally . - bool ShouldStall() { - if (allow_stall_ && enabled()) { - if (IsStallActive()) { - return true; - } - if (IsStallThresholdExceeded()) { - stall_active_.store(true, std::memory_order_relaxed); - return true; - } + bool ShouldStall() const { + if (!allow_stall_ || !enabled()) { + return false; } - return false; + + return IsStallActive() || IsStallThresholdExceeded(); } // Returns true if stall is active. @@ -137,7 +130,9 @@ class WriteBufferManager { } // Returns true if stalling condition is met. - bool IsStallThresholdExceeded() { return memory_usage() >= buffer_size_; } + bool IsStallThresholdExceeded() const { + return memory_usage() >= buffer_size_; + } void ReserveMem(size_t mem); @@ -151,8 +146,9 @@ class WriteBufferManager { // Should only be called by RocksDB internally. void BeginWriteStall(StallInterface* wbm_stall); - // Remove DB instances from queue and signal them to continue. - void EndWriteStall(); + // If stall conditions have resolved, remove DB instances from queue and + // signal them to continue. + void MaybeEndWriteStall(); void RemoveDBFromQueue(StallInterface* wbm_stall); @@ -167,9 +163,11 @@ class WriteBufferManager { std::mutex cache_rev_mng_mu_; std::list queue_; - // Protects the queue_ + // Protects the queue_ and stall_active_. std::mutex mu_; bool allow_stall_; + // Value should only be changed by BeginWriteStall() and MaybeEndWriteStall() + // while holding mu_, but it can be read without a lock. std::atomic stall_active_; void ReserveMemWithCache(size_t mem); diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index c599b658c1..48278aaaf3 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -39,7 +39,12 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size, #endif // ROCKSDB_LITE } -WriteBufferManager::~WriteBufferManager() = default; +WriteBufferManager::~WriteBufferManager() { +#ifndef NDEBUG + std::unique_lock lock(mu_); + assert(queue_.empty()); +#endif +} std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const { if (cache_rev_mng_ != nullptr) { @@ -98,9 +103,7 @@ void WriteBufferManager::FreeMem(size_t mem) { memory_used_.fetch_sub(mem, std::memory_order_relaxed); } // Check if stall is active and can be ended. - if (allow_stall_) { - EndWriteStall(); - } + MaybeEndWriteStall(); } void WriteBufferManager::FreeMemWithCache(size_t mem) { @@ -127,47 +130,74 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) { void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) { assert(wbm_stall != nullptr); - if (wbm_stall) { + assert(allow_stall_); + + // Allocate outside of the lock. + std::list new_node = {wbm_stall}; + + { std::unique_lock lock(mu_); - queue_.push_back(wbm_stall); + // Verify if the stall conditions are stil active. + if (ShouldStall()) { + stall_active_.store(true, std::memory_order_relaxed); + queue_.splice(queue_.end(), std::move(new_node)); + } } - // In case thread enqueue itself and memory got freed in parallel, end the - // stall. - if (!ShouldStall()) { - EndWriteStall(); + + // If the node was not consumed, the stall has ended already and we can signal + // the caller. + if (!new_node.empty()) { + new_node.front()->Signal(); } } -// Called when memory is freed in FreeMem. -void WriteBufferManager::EndWriteStall() { - if (enabled() && !IsStallThresholdExceeded()) { - { - std::unique_lock lock(mu_); - stall_active_.store(false, std::memory_order_relaxed); - if (queue_.empty()) { - return; - } - } - - // Get the instances from the list and call WBMStallInterface::Signal to - // change the state to running and unblock the DB instances. - // Check ShouldStall() incase stall got active by other DBs. - while (!ShouldStall() && !queue_.empty()) { - std::unique_lock lock(mu_); - StallInterface* wbm_stall = queue_.front(); - queue_.pop_front(); - wbm_stall->Signal(); - } +// Called when memory is freed in FreeMem or the buffer size has changed. +void WriteBufferManager::MaybeEndWriteStall() { + // Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock + // the writers. + if (!allow_stall_) { + return; } + + if (IsStallThresholdExceeded()) { + return; // Stall conditions have not resolved. + } + + // Perform all deallocations outside of the lock. + std::list cleanup; + + std::unique_lock lock(mu_); + if (!stall_active_.load(std::memory_order_relaxed)) { + return; // Nothing to do. + } + + // Unblock new writers. + stall_active_.store(false, std::memory_order_relaxed); + + // Unblock the writers in the queue. + for (StallInterface* wbm_stall : queue_) { + wbm_stall->Signal(); + } + cleanup = std::move(queue_); } void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) { assert(wbm_stall != nullptr); + + // Deallocate the removed nodes outside of the lock. + std::list cleanup; + if (enabled() && allow_stall_) { std::unique_lock lock(mu_); - queue_.remove(wbm_stall); - wbm_stall->Signal(); + for (auto it = queue_.begin(); it != queue_.end();) { + auto next = std::next(it); + if (*it == wbm_stall) { + cleanup.splice(cleanup.end(), queue_, std::move(it)); + } + it = next; + } } + wbm_stall->Signal(); } } // namespace ROCKSDB_NAMESPACE