mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 13:41:46 +00:00
ca0ef54f16
Summary: **Context:** WAL flush is currently not rate-limited by `Options::rate_limiter`. This PR is to provide rate-limiting to auto WAL flush, the one that automatically happen after each user write operation (i.e, `Options::manual_wal_flush == false`), by adding `WriteOptions::rate_limiter_options`. Note that we are NOT rate-limiting WAL flush that do NOT automatically happen after each user write, such as `Options::manual_wal_flush == true + manual FlushWAL()` (rate-limiting multiple WAL flushes), for the benefits of: - being consistent with [ReadOptions::rate_limiter_priority](https://github.com/facebook/rocksdb/blob/7.0.fb/include/rocksdb/options.h#L515) - being able to turn off some WAL flush's rate-limiting but not all (e.g, turn off specific the WAL flush of a critical user write like a service's heartbeat) `WriteOptions::rate_limiter_options` only accept `Env::IO_USER` and `Env::IO_TOTAL` currently due to an implementation constraint. - The constraint is that we currently queue parallel writes (including WAL writes) based on FIFO policy which does not factor rate limiter priority into this layer's scheduling. If we allow lower priorities such as `Env::IO_HIGH/MID/LOW` and such writes specified with lower priorities occurs before ones specified with higher priorities (even just by a tiny bit in arrival time), the former would have blocked the latter, leading to a "priority inversion" issue and contradictory to what we promise for rate-limiting priority. Therefore we only allow `Env::IO_USER` and `Env::IO_TOTAL` right now before improving that scheduling. A pre-requisite to this feature is to support operation-level rate limiting in `WritableFileWriter`, which is also included in this PR. **Summary:** - Renamed test suite `DBRateLimiterTest to DBRateLimiterOnReadTest` for adding a new test suite - Accept `rate_limiter_priority` in `WritableFileWriter`'s private and public write functions - Passed `WriteOptions::rate_limiter_options` to `WritableFileWriter` in the path of automatic WAL flush. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9607 Test Plan: - Added new unit test to verify existing flush/compaction rate-limiting does not break, since `DBTest, RateLimitingTest` is disabled and current db-level rate-limiting tests focus on read only (e.g, `db_rate_limiter_test`, `DBTest2, RateLimitedCompactionReads`). - Added new unit test `DBRateLimiterOnWriteWALTest, AutoWalFlush` - `strace -ftt -e trace=write ./db_bench -benchmarks=fillseq -db=/dev/shm/testdb -rate_limit_auto_wal_flush=1 -rate_limiter_bytes_per_sec=15 -rate_limiter_refill_period_us=1000000 -write_buffer_size=100000000 -disable_auto_compactions=1 -num=100` - verified that WAL flush(i.e, system-call _write_) were chunked into 15 bytes and each _write_ was roughly 1 second apart - verified the chunking disappeared when `-rate_limit_auto_wal_flush=0` - crash test: `python3 tools/db_crashtest.py blackbox --disable_wal=0 --rate_limit_auto_wal_flush=1 --rate_limiter_bytes_per_sec=10485760 --interval=10` killed as normal **Benchmarked on flush/compaction to ensure no performance regression:** - compaction with rate-limiting (see table 1, avg over 1280-run): pre-change: **915635 micros/op**; post-change: **907350 micros/op (improved by 0.106%)** ``` #!/bin/bash TEST_TMPDIR=/dev/shm/testdb START=1 NUM_DATA_ENTRY=8 N=10 rm -f compact_bmk_output.txt compact_bmk_output_2.txt dont_care_output.txt for i in $(eval echo "{$START..$NUM_DATA_ENTRY}") do NUM_RUN=$(($N*(2**($i-1)))) for j in $(eval echo "{$START..$NUM_RUN}") do ./db_bench --benchmarks=fillrandom -db=$TEST_TMPDIR -disable_auto_compactions=1 -write_buffer_size=6710886 > dont_care_output.txt && ./db_bench --benchmarks=compact -use_existing_db=1 -db=$TEST_TMPDIR -level0_file_num_compaction_trigger=1 -rate_limiter_bytes_per_sec=100000000 | egrep 'compact' done > compact_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' compact_bmk_output.txt >> compact_bmk_output_2.txt done ``` - compaction w/o rate-limiting (see table 2, avg over 640-run): pre-change: **822197 micros/op**; post-change: **823148 micros/op (regressed by 0.12%)** ``` Same as above script, except that -rate_limiter_bytes_per_sec=0 ``` - flush with rate-limiting (see table 3, avg over 320-run, run on the [patch](ee5c6023a9
) to augment current db_bench ): pre-change: **745752 micros/op**; post-change: **745331 micros/op (regressed by 0.06 %)** ``` #!/bin/bash TEST_TMPDIR=/dev/shm/testdb START=1 NUM_DATA_ENTRY=8 N=10 rm -f flush_bmk_output.txt flush_bmk_output_2.txt for i in $(eval echo "{$START..$NUM_DATA_ENTRY}") do NUM_RUN=$(($N*(2**($i-1)))) for j in $(eval echo "{$START..$NUM_RUN}") do ./db_bench -db=$TEST_TMPDIR -write_buffer_size=1048576000 -num=1000000 -rate_limiter_bytes_per_sec=100000000 -benchmarks=fillseq,flush | egrep 'flush' done > flush_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' flush_bmk_output.txt >> flush_bmk_output_2.txt done ``` - flush w/o rate-limiting (see table 4, avg over 320-run, run on the [patch](ee5c6023a9
) to augment current db_bench): pre-change: **487512 micros/op**, post-change: **485856 micors/ops (improved by 0.34%)** ``` Same as above script, except that -rate_limiter_bytes_per_sec=0 ``` | table 1 - compact with rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 896978 | 16046.9 | 901242 | 15670.9 | 0.475373978 20 | 893718 | 15813 | 886505 | 17544.7 | -0.8070778478 40 | 900426 | 23882.2 | 894958 | 15104.5 | -0.6072681153 80 | 906635 | 21761.5 | 903332 | 23948.3 | -0.3643141948 160 | 898632 | 21098.9 | 907583 | 21145 | 0.9960695813 3.20E+02 | 905252 | 22785.5 | 908106 | 25325.5 | 0.3152713278 6.40E+02 | 905213 | 23598.6 | 906741 | 21370.5 | 0.1688000504 **1.28E+03** | **908316** | **23533.1** | **907350** | **24626.8** | **-0.1063506533** average over #-run | 901896.25 | 21064.9625 | 901977.125 | 20592.025 | 0.008967217682 | table 2 - compact w/o rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 811211 | 26996.7 | 807586 | 28456.4 | -0.4468627768 20 | 815465 | 14803.7 | 814608 | 28719.7 | -0.105093413 40 | 809203 | 26187.1 | 797835 | 25492.1 | -1.404839082 80 | 822088 | 28765.3 | 822192 | 32840.4 | 0.01265071379 160 | 821719 | 36344.7 | 821664 | 29544.9 | -0.006693285661 3.20E+02 | 820921 | 27756.4 | 821403 | 28347.7 | 0.05871454135 **6.40E+02** | **822197** | **28960.6** | **823148** | **30055.1** | **0.1156657103** average over #-run | 8.18E+05 | 2.71E+04 | 8.15E+05 | 2.91E+04 | -0.25 | table 3 - flush with rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 741721 | 11770.8 | 740345 | 5949.76 | -0.1855144994 20 | 735169 | 3561.83 | 743199 | 9755.77 | 1.09226586 40 | 743368 | 8891.03 | 742102 | 8683.22 | -0.1703059588 80 | 742129 | 8148.51 | 743417 | 9631.58| 0.1735547324 160 | 749045 | 9757.21 | 746256 | 9191.86 | -0.3723407806 **3.20E+02** | **745752** | **9819.65** | **745331** | **9840.62** | **-0.0564530836** 6.40E+02 | 749006 | 11080.5 | 748173 | 10578.7 | -0.1112140624 average over #-run | 743741.4286 | 9004.218571 | 744117.5714 | 9090.215714 | 0.05057441238 | table 4 - flush w/o rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 477283 | 24719.6 | 473864 | 12379 | -0.7163464863 20 | 486743 | 20175.2 | 502296 | 23931.3 | 3.195320734 40 | 482846 | 15309.2 | 489820 | 22259.5 | 1.444352858 80 | 491490 | 21883.1 | 490071 | 23085.7 | -0.2887139108 160 | 493347 | 28074.3 | 483609 | 21211.7 | -1.973864238 **3.20E+02** | **487512** | **21401.5** | **485856** | **22195.2** | **-0.3396839462** 6.40E+02 | 490307 | 25418.6 | 485435 | 22405.2 | -0.9936631539 average over #-run | 4.87E+05 | 2.24E+04 | 4.87E+05 | 2.11E+04 | 0.00E+00 Reviewed By: ajkr Differential Revision: D34442441 Pulled By: hx235 fbshipit-source-id: 4790f13e1e5c0a95ae1d1cc93ffcf69dc6e78bdd
440 lines
16 KiB
C++
440 lines
16 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#pragma once
|
|
|
|
#include <atomic>
|
|
#include <cassert>
|
|
#include <chrono>
|
|
#include <condition_variable>
|
|
#include <cstdint>
|
|
#include <mutex>
|
|
#include <type_traits>
|
|
#include <vector>
|
|
|
|
#include "db/dbformat.h"
|
|
#include "db/pre_release_callback.h"
|
|
#include "db/write_callback.h"
|
|
#include "monitoring/instrumented_mutex.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/status.h"
|
|
#include "rocksdb/types.h"
|
|
#include "rocksdb/write_batch.h"
|
|
#include "util/autovector.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class WriteThread {
|
|
public:
|
|
enum State : uint8_t {
|
|
// The initial state of a writer. This is a Writer that is
|
|
// waiting in JoinBatchGroup. This state can be left when another
|
|
// thread informs the waiter that it has become a group leader
|
|
// (-> STATE_GROUP_LEADER), when a leader that has chosen to be
|
|
// non-parallel informs a follower that its writes have been committed
|
|
// (-> STATE_COMPLETED), or when a leader that has chosen to perform
|
|
// updates in parallel and needs this Writer to apply its batch (->
|
|
// STATE_PARALLEL_MEMTABLE_WRITER).
|
|
STATE_INIT = 1,
|
|
|
|
// The state used to inform a waiting Writer that it has become the
|
|
// leader, and it should now build a write batch group. Tricky:
|
|
// this state is not used if newest_writer_ is empty when a writer
|
|
// enqueues itself, because there is no need to wait (or even to
|
|
// create the mutex and condvar used to wait) in that case. This is
|
|
// a terminal state unless the leader chooses to make this a parallel
|
|
// batch, in which case the last parallel worker to finish will move
|
|
// the leader to STATE_COMPLETED.
|
|
STATE_GROUP_LEADER = 2,
|
|
|
|
// The state used to inform a waiting writer that it has become the
|
|
// leader of memtable writer group. The leader will either write
|
|
// memtable for the whole group, or launch a parallel group write
|
|
// to memtable by calling LaunchParallelMemTableWrite.
|
|
STATE_MEMTABLE_WRITER_LEADER = 4,
|
|
|
|
// The state used to inform a waiting writer that it has become a
|
|
// parallel memtable writer. It can be the group leader who launch the
|
|
// parallel writer group, or one of the followers. The writer should then
|
|
// apply its batch to the memtable concurrently and call
|
|
// CompleteParallelMemTableWriter.
|
|
STATE_PARALLEL_MEMTABLE_WRITER = 8,
|
|
|
|
// A follower whose writes have been applied, or a parallel leader
|
|
// whose followers have all finished their work. This is a terminal
|
|
// state.
|
|
STATE_COMPLETED = 16,
|
|
|
|
// A state indicating that the thread may be waiting using StateMutex()
|
|
// and StateCondVar()
|
|
STATE_LOCKED_WAITING = 32,
|
|
};
|
|
|
|
struct Writer;
|
|
|
|
struct WriteGroup {
|
|
Writer* leader = nullptr;
|
|
Writer* last_writer = nullptr;
|
|
SequenceNumber last_sequence;
|
|
// before running goes to zero, status needs leader->StateMutex()
|
|
Status status;
|
|
std::atomic<size_t> running;
|
|
size_t size = 0;
|
|
|
|
struct Iterator {
|
|
Writer* writer;
|
|
Writer* last_writer;
|
|
|
|
explicit Iterator(Writer* w, Writer* last)
|
|
: writer(w), last_writer(last) {}
|
|
|
|
Writer* operator*() const { return writer; }
|
|
|
|
Iterator& operator++() {
|
|
assert(writer != nullptr);
|
|
if (writer == last_writer) {
|
|
writer = nullptr;
|
|
} else {
|
|
writer = writer->link_newer;
|
|
}
|
|
return *this;
|
|
}
|
|
|
|
bool operator!=(const Iterator& other) const {
|
|
return writer != other.writer;
|
|
}
|
|
};
|
|
|
|
Iterator begin() const { return Iterator(leader, last_writer); }
|
|
Iterator end() const { return Iterator(nullptr, nullptr); }
|
|
};
|
|
|
|
// Information kept for every waiting writer.
|
|
struct Writer {
|
|
WriteBatch* batch;
|
|
bool sync;
|
|
bool no_slowdown;
|
|
bool disable_wal;
|
|
Env::IOPriority rate_limiter_priority;
|
|
bool disable_memtable;
|
|
size_t batch_cnt; // if non-zero, number of sub-batches in the write batch
|
|
size_t protection_bytes_per_key;
|
|
PreReleaseCallback* pre_release_callback;
|
|
uint64_t log_used; // log number that this batch was inserted into
|
|
uint64_t log_ref; // log number that memtable insert should reference
|
|
WriteCallback* callback;
|
|
bool made_waitable; // records lazy construction of mutex and cv
|
|
std::atomic<uint8_t> state; // write under StateMutex() or pre-link
|
|
WriteGroup* write_group;
|
|
SequenceNumber sequence; // the sequence number to use for the first key
|
|
Status status;
|
|
Status callback_status; // status returned by callback->Callback()
|
|
|
|
std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
|
|
std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
|
|
Writer* link_older; // read/write only before linking, or as leader
|
|
Writer* link_newer; // lazy, read/write only before linking, or as leader
|
|
|
|
Writer()
|
|
: batch(nullptr),
|
|
sync(false),
|
|
no_slowdown(false),
|
|
disable_wal(false),
|
|
rate_limiter_priority(Env::IOPriority::IO_TOTAL),
|
|
disable_memtable(false),
|
|
batch_cnt(0),
|
|
protection_bytes_per_key(0),
|
|
pre_release_callback(nullptr),
|
|
log_used(0),
|
|
log_ref(0),
|
|
callback(nullptr),
|
|
made_waitable(false),
|
|
state(STATE_INIT),
|
|
write_group(nullptr),
|
|
sequence(kMaxSequenceNumber),
|
|
link_older(nullptr),
|
|
link_newer(nullptr) {}
|
|
|
|
Writer(const WriteOptions& write_options, WriteBatch* _batch,
|
|
WriteCallback* _callback, uint64_t _log_ref, bool _disable_memtable,
|
|
size_t _batch_cnt = 0,
|
|
PreReleaseCallback* _pre_release_callback = nullptr)
|
|
: batch(_batch),
|
|
sync(write_options.sync),
|
|
no_slowdown(write_options.no_slowdown),
|
|
disable_wal(write_options.disableWAL),
|
|
rate_limiter_priority(write_options.rate_limiter_priority),
|
|
disable_memtable(_disable_memtable),
|
|
batch_cnt(_batch_cnt),
|
|
protection_bytes_per_key(_batch->GetProtectionBytesPerKey()),
|
|
pre_release_callback(_pre_release_callback),
|
|
log_used(0),
|
|
log_ref(_log_ref),
|
|
callback(_callback),
|
|
made_waitable(false),
|
|
state(STATE_INIT),
|
|
write_group(nullptr),
|
|
sequence(kMaxSequenceNumber),
|
|
link_older(nullptr),
|
|
link_newer(nullptr) {}
|
|
|
|
~Writer() {
|
|
if (made_waitable) {
|
|
StateMutex().~mutex();
|
|
StateCV().~condition_variable();
|
|
}
|
|
status.PermitUncheckedError();
|
|
callback_status.PermitUncheckedError();
|
|
}
|
|
|
|
bool CheckCallback(DB* db) {
|
|
if (callback != nullptr) {
|
|
callback_status = callback->Callback(db);
|
|
}
|
|
return callback_status.ok();
|
|
}
|
|
|
|
void CreateMutex() {
|
|
if (!made_waitable) {
|
|
// Note that made_waitable is tracked separately from state
|
|
// transitions, because we can't atomically create the mutex and
|
|
// link into the list.
|
|
made_waitable = true;
|
|
new (&state_mutex_bytes) std::mutex;
|
|
new (&state_cv_bytes) std::condition_variable;
|
|
}
|
|
}
|
|
|
|
// returns the aggregate status of this Writer
|
|
Status FinalStatus() {
|
|
if (!status.ok()) {
|
|
// a non-ok memtable write status takes presidence
|
|
assert(callback == nullptr || callback_status.ok());
|
|
return status;
|
|
} else if (!callback_status.ok()) {
|
|
// if the callback failed then that is the status we want
|
|
// because a memtable insert should not have been attempted
|
|
assert(callback != nullptr);
|
|
assert(status.ok());
|
|
return callback_status;
|
|
} else {
|
|
// if there is no callback then we only care about
|
|
// the memtable insert status
|
|
assert(callback == nullptr || callback_status.ok());
|
|
return status;
|
|
}
|
|
}
|
|
|
|
bool CallbackFailed() {
|
|
return (callback != nullptr) && !callback_status.ok();
|
|
}
|
|
|
|
bool ShouldWriteToMemtable() {
|
|
return status.ok() && !CallbackFailed() && !disable_memtable;
|
|
}
|
|
|
|
bool ShouldWriteToWAL() {
|
|
return status.ok() && !CallbackFailed() && !disable_wal;
|
|
}
|
|
|
|
// No other mutexes may be acquired while holding StateMutex(), it is
|
|
// always last in the order
|
|
std::mutex& StateMutex() {
|
|
assert(made_waitable);
|
|
return *static_cast<std::mutex*>(static_cast<void*>(&state_mutex_bytes));
|
|
}
|
|
|
|
std::condition_variable& StateCV() {
|
|
assert(made_waitable);
|
|
return *static_cast<std::condition_variable*>(
|
|
static_cast<void*>(&state_cv_bytes));
|
|
}
|
|
};
|
|
|
|
struct AdaptationContext {
|
|
const char* name;
|
|
std::atomic<int32_t> value;
|
|
|
|
explicit AdaptationContext(const char* name0) : name(name0), value(0) {}
|
|
};
|
|
|
|
explicit WriteThread(const ImmutableDBOptions& db_options);
|
|
|
|
virtual ~WriteThread() = default;
|
|
|
|
// IMPORTANT: None of the methods in this class rely on the db mutex
|
|
// for correctness. All of the methods except JoinBatchGroup and
|
|
// EnterUnbatched may be called either with or without the db mutex held.
|
|
// Correctness is maintained by ensuring that only a single thread is
|
|
// a leader at a time.
|
|
|
|
// Registers w as ready to become part of a batch group, waits until the
|
|
// caller should perform some work, and returns the current state of the
|
|
// writer. If w has become the leader of a write batch group, returns
|
|
// STATE_GROUP_LEADER. If w has been made part of a sequential batch
|
|
// group and the leader has performed the write, returns STATE_DONE.
|
|
// If w has been made part of a parallel batch group and is responsible
|
|
// for updating the memtable, returns STATE_PARALLEL_MEMTABLE_WRITER.
|
|
//
|
|
// The db mutex SHOULD NOT be held when calling this function, because
|
|
// it will block.
|
|
//
|
|
// Writer* w: Writer to be executed as part of a batch group
|
|
void JoinBatchGroup(Writer* w);
|
|
|
|
// Constructs a write batch group led by leader, which should be a
|
|
// Writer passed to JoinBatchGroup on the current thread.
|
|
//
|
|
// Writer* leader: Writer that is STATE_GROUP_LEADER
|
|
// WriteGroup* write_group: Out-param of group members
|
|
// returns: Total batch group byte size
|
|
size_t EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group);
|
|
|
|
// Unlinks the Writer-s in a batch group, wakes up the non-leaders,
|
|
// and wakes up the next leader (if any).
|
|
//
|
|
// WriteGroup* write_group: the write group
|
|
// Status status: Status of write operation
|
|
void ExitAsBatchGroupLeader(WriteGroup& write_group, Status& status);
|
|
|
|
// Exit batch group on behalf of batch group leader.
|
|
void ExitAsBatchGroupFollower(Writer* w);
|
|
|
|
// Constructs a write batch group led by leader from newest_memtable_writers_
|
|
// list. The leader should either write memtable for the whole group and
|
|
// call ExitAsMemTableWriter, or launch parallel memtable write through
|
|
// LaunchParallelMemTableWriters.
|
|
void EnterAsMemTableWriter(Writer* leader, WriteGroup* write_grup);
|
|
|
|
// Memtable writer group leader, or the last finished writer in a parallel
|
|
// write group, exit from the newest_memtable_writers_ list, and wake up
|
|
// the next leader if needed.
|
|
void ExitAsMemTableWriter(Writer* self, WriteGroup& write_group);
|
|
|
|
// Causes JoinBatchGroup to return STATE_PARALLEL_MEMTABLE_WRITER for all of
|
|
// the non-leader members of this write batch group. Sets Writer::sequence
|
|
// before waking them up.
|
|
//
|
|
// WriteGroup* write_group: Extra state used to coordinate the parallel add
|
|
void LaunchParallelMemTableWriters(WriteGroup* write_group);
|
|
|
|
// Reports the completion of w's batch to the parallel group leader, and
|
|
// waits for the rest of the parallel batch to complete. Returns true
|
|
// if this thread is the last to complete, and hence should advance
|
|
// the sequence number and then call EarlyExitParallelGroup, false if
|
|
// someone else has already taken responsibility for that.
|
|
bool CompleteParallelMemTableWriter(Writer* w);
|
|
|
|
// Waits for all preceding writers (unlocking mu while waiting), then
|
|
// registers w as the currently proceeding writer.
|
|
//
|
|
// Writer* w: A Writer not eligible for batching
|
|
// InstrumentedMutex* mu: The db mutex, to unlock while waiting
|
|
// REQUIRES: db mutex held
|
|
void EnterUnbatched(Writer* w, InstrumentedMutex* mu);
|
|
|
|
// Completes a Writer begun with EnterUnbatched, unblocking subsequent
|
|
// writers.
|
|
void ExitUnbatched(Writer* w);
|
|
|
|
// Wait for all parallel memtable writers to finish, in case pipelined
|
|
// write is enabled.
|
|
void WaitForMemTableWriters();
|
|
|
|
SequenceNumber UpdateLastSequence(SequenceNumber sequence) {
|
|
if (sequence > last_sequence_) {
|
|
last_sequence_ = sequence;
|
|
}
|
|
return last_sequence_;
|
|
}
|
|
|
|
// Insert a dummy writer at the tail of the write queue to indicate a write
|
|
// stall, and fail any writers in the queue with no_slowdown set to true
|
|
void BeginWriteStall();
|
|
|
|
// Remove the dummy writer and wake up waiting writers
|
|
void EndWriteStall();
|
|
|
|
private:
|
|
// See AwaitState.
|
|
const uint64_t max_yield_usec_;
|
|
const uint64_t slow_yield_usec_;
|
|
|
|
// Allow multiple writers write to memtable concurrently.
|
|
const bool allow_concurrent_memtable_write_;
|
|
|
|
// Enable pipelined write to WAL and memtable.
|
|
const bool enable_pipelined_write_;
|
|
|
|
// The maximum limit of number of bytes that are written in a single batch
|
|
// of WAL or memtable write. It is followed when the leader write size
|
|
// is larger than 1/8 of this limit.
|
|
const uint64_t max_write_batch_group_size_bytes;
|
|
|
|
// Points to the newest pending writer. Only leader can remove
|
|
// elements, adding can be done lock-free by anybody.
|
|
std::atomic<Writer*> newest_writer_;
|
|
|
|
// Points to the newest pending memtable writer. Used only when pipelined
|
|
// write is enabled.
|
|
std::atomic<Writer*> newest_memtable_writer_;
|
|
|
|
// The last sequence that have been consumed by a writer. The sequence
|
|
// is not necessary visible to reads because the writer can be ongoing.
|
|
SequenceNumber last_sequence_;
|
|
|
|
// A dummy writer to indicate a write stall condition. This will be inserted
|
|
// at the tail of the writer queue by the leader, so newer writers can just
|
|
// check for this and bail
|
|
Writer write_stall_dummy_;
|
|
|
|
// Mutex and condvar for writers to block on a write stall. During a write
|
|
// stall, writers with no_slowdown set to false will wait on this rather
|
|
// on the writer queue
|
|
port::Mutex stall_mu_;
|
|
port::CondVar stall_cv_;
|
|
|
|
// Waits for w->state & goal_mask using w->StateMutex(). Returns
|
|
// the state that satisfies goal_mask.
|
|
uint8_t BlockingAwaitState(Writer* w, uint8_t goal_mask);
|
|
|
|
// Blocks until w->state & goal_mask, returning the state value
|
|
// that satisfied the predicate. Uses ctx to adaptively use
|
|
// std::this_thread::yield() to avoid mutex overheads. ctx should be
|
|
// a context-dependent static.
|
|
uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx);
|
|
|
|
// Set writer state and wake the writer up if it is waiting.
|
|
void SetState(Writer* w, uint8_t new_state);
|
|
|
|
// Links w into the newest_writer list. Return true if w was linked directly
|
|
// into the leader position. Safe to call from multiple threads without
|
|
// external locking.
|
|
bool LinkOne(Writer* w, std::atomic<Writer*>* newest_writer);
|
|
|
|
// Link write group into the newest_writer list as a whole, while keeping the
|
|
// order of the writers unchanged. Return true if the group was linked
|
|
// directly into the leader position.
|
|
bool LinkGroup(WriteGroup& write_group, std::atomic<Writer*>* newest_writer);
|
|
|
|
// Computes any missing link_newer links. Should not be called
|
|
// concurrently with itself.
|
|
void CreateMissingNewerLinks(Writer* head);
|
|
|
|
// Starting from a pending writer, follow link_older to search for next
|
|
// leader, until we hit boundary.
|
|
Writer* FindNextLeader(Writer* pending_writer, Writer* boundary);
|
|
|
|
// Set the leader in write_group to completed state and remove it from the
|
|
// write group.
|
|
void CompleteLeader(WriteGroup& write_group);
|
|
|
|
// Set a follower in write_group to completed state and remove it from the
|
|
// write group.
|
|
void CompleteFollower(Writer* w, WriteGroup& write_group);
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|