mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-27 11:43:49 +00:00
91166012c8
Summary: Previously, the flushes triggered by `WriteBufferManager` could affect the same CF repeatedly if it happens to get consecutive writes. Such flushes are not particularly useful for reducing memory usage since they switch nearly-empty memtables to immutable while they've just begun filling their first arena block. In fact they may not even reduce the mutable memory count if they involve replacing one mutable memtable containing one arena block with a new mutable memtable containing one arena block. Further, if such switches happen even a few times before a flush finishes, the immutable memtable limit will be reached and writes will stall. This PR adds a heuristic to not switch memtables to immutable for CFs that already have one or more immutable memtables awaiting flush. There is a memory usage regression if the user continues writing to the same CF, that DB does not have any CFs eligible for switching, flushes are not finishing, and the `WriteBufferManager` was constructed with `allow_stall=false`. Before, it would grow by switching nearly empty memtables until writes stall. Now, it would grow by filling memtables until writes stall. This feels like an acceptable behavior change because users who prefer to stall over violate the memory limit should be using `allow_stall=true`, which is unaffected by this PR. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6364 Test Plan: - Command: `rm -rf /dev/shm/dbbench/ && TEST_TMPDIR=/dev/shm ./db_bench -benchmarks=fillrandom -num_multi_db=8 -num_column_families=2 -write_buffer_size=4194304 -db_write_buffer_size=16777216 -compression_type=none -statistics=true -target_file_size_base=4194304 -max_bytes_for_level_base=16777216` - `rocksdb.db.write.stall` count before this PR: 175 - `rocksdb.db.write.stall` count after this PR: 0 Reviewed By: jay-zhuang Differential Revision: D20167197 Pulled By: ajkr fbshipit-source-id: 4a64064e9bc33d57c0a35f15547542d0191d0cb7
863 lines
29 KiB
C++
863 lines
29 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/db_test_util.h"
|
|
#include "db/write_thread.h"
|
|
#include "port/stack_trace.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class DBWriteBufferManagerTest : public DBTestBase,
|
|
public testing::WithParamInterface<bool> {
|
|
public:
|
|
DBWriteBufferManagerTest()
|
|
: DBTestBase("db_write_buffer_manager_test", /*env_do_fsync=*/false) {}
|
|
bool cost_cache_;
|
|
};
|
|
|
|
TEST_P(DBWriteBufferManagerTest, SharedBufferAcrossCFs1) {
|
|
Options options = CurrentOptions();
|
|
options.arena_block_size = 4096;
|
|
options.write_buffer_size = 500000; // this is never hit
|
|
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
|
|
ASSERT_LT(cache->GetUsage(), 256 * 1024);
|
|
cost_cache_ = GetParam();
|
|
|
|
if (cost_cache_) {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, cache, true));
|
|
} else {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, nullptr, true));
|
|
}
|
|
|
|
WriteOptions wo;
|
|
wo.disableWAL = true;
|
|
|
|
CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);
|
|
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
|
|
Flush(3);
|
|
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
|
|
Flush(0);
|
|
|
|
// Write to "Default", "cf2" and "cf3".
|
|
ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
|
|
ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
|
|
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
|
|
|
|
ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
|
|
// WriteBufferManager::buffer_size_ has exceeded after the previous write is
|
|
// completed.
|
|
|
|
// This make sures write will go through and if stall was in effect, it will
|
|
// end.
|
|
ASSERT_OK(Put(0, Key(2), DummyString(1), wo));
|
|
}
|
|
|
|
// Test Single DB with multiple writer threads get blocked when
|
|
// WriteBufferManager execeeds buffer_size_ and flush is waiting to be
|
|
// finished.
|
|
TEST_P(DBWriteBufferManagerTest, SharedWriteBufferAcrossCFs2) {
|
|
Options options = CurrentOptions();
|
|
options.arena_block_size = 4096;
|
|
options.write_buffer_size = 500000; // this is never hit
|
|
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
|
|
ASSERT_LT(cache->GetUsage(), 256 * 1024);
|
|
cost_cache_ = GetParam();
|
|
|
|
if (cost_cache_) {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, cache, true));
|
|
} else {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, nullptr, true));
|
|
}
|
|
WriteOptions wo;
|
|
wo.disableWAL = true;
|
|
|
|
CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);
|
|
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
|
|
Flush(3);
|
|
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
|
|
Flush(0);
|
|
|
|
// Write to "Default", "cf2" and "cf3". No flush will be triggered.
|
|
ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
|
|
ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
|
|
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
|
|
|
|
ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
|
|
// WriteBufferManager::buffer_size_ has exceeded after the previous write is
|
|
// completed.
|
|
|
|
std::unordered_set<WriteThread::Writer*> w_set;
|
|
std::vector<port::Thread> threads;
|
|
int wait_count_db = 0;
|
|
int num_writers = 4;
|
|
InstrumentedMutex mutex;
|
|
InstrumentedCondVar cv(&mutex);
|
|
std::atomic<int> thread_num(0);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
|
|
"DBImpl::BackgroundCallFlush:start"}});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"WBMStallInterface::BlockDB", [&](void*) {
|
|
InstrumentedMutexLock lock(&mutex);
|
|
wait_count_db++;
|
|
cv.SignalAll();
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"WriteThread::WriteStall::Wait", [&](void* arg) {
|
|
InstrumentedMutexLock lock(&mutex);
|
|
WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
|
|
w_set.insert(w);
|
|
// Allow the flush to continue if all writer threads are blocked.
|
|
if (w_set.size() == (unsigned long)num_writers) {
|
|
TEST_SYNC_POINT(
|
|
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
bool s = true;
|
|
|
|
std::function<void(int)> writer = [&](int cf) {
|
|
int a = thread_num.fetch_add(1);
|
|
std::string key = "foo" + std::to_string(a);
|
|
Status tmp = Put(cf, Slice(key), DummyString(1), wo);
|
|
InstrumentedMutexLock lock(&mutex);
|
|
s = s && tmp.ok();
|
|
};
|
|
|
|
// Flow:
|
|
// main_writer thread will write but will be blocked (as Flush will on hold,
|
|
// buffer_size_ has exceeded, thus will create stall in effect).
|
|
// |
|
|
// |
|
|
// multiple writer threads will be created to write across multiple columns
|
|
// and they will be blocked.
|
|
// |
|
|
// |
|
|
// Last writer thread will write and when its blocked it will signal Flush to
|
|
// continue to clear the stall.
|
|
|
|
threads.emplace_back(writer, 1);
|
|
// Wait untill first thread (main_writer) writing to DB is blocked and then
|
|
// create the multiple writers which will be blocked from getting added to the
|
|
// queue because stall is in effect.
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
while (wait_count_db != 1) {
|
|
cv.Wait();
|
|
}
|
|
}
|
|
for (int i = 0; i < num_writers; i++) {
|
|
threads.emplace_back(writer, i % 4);
|
|
}
|
|
for (auto& t : threads) {
|
|
t.join();
|
|
}
|
|
|
|
ASSERT_TRUE(s);
|
|
|
|
// Number of DBs blocked.
|
|
ASSERT_EQ(wait_count_db, 1);
|
|
// Number of Writer threads blocked.
|
|
ASSERT_EQ(w_set.size(), num_writers);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// Test multiple DBs get blocked when WriteBufferManager limit exceeds and flush
|
|
// is waiting to be finished but DBs tries to write meanwhile.
|
|
TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB) {
|
|
std::vector<std::string> dbnames;
|
|
std::vector<DB*> dbs;
|
|
int num_dbs = 3;
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
dbs.push_back(nullptr);
|
|
dbnames.push_back(
|
|
test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
|
|
}
|
|
|
|
Options options = CurrentOptions();
|
|
options.arena_block_size = 4096;
|
|
options.write_buffer_size = 500000; // this is never hit
|
|
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
|
|
ASSERT_LT(cache->GetUsage(), 256 * 1024);
|
|
cost_cache_ = GetParam();
|
|
|
|
if (cost_cache_) {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, cache, true));
|
|
} else {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, nullptr, true));
|
|
}
|
|
CreateAndReopenWithCF({"cf1", "cf2"}, options);
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
ASSERT_OK(DestroyDB(dbnames[i], options));
|
|
ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
|
|
}
|
|
WriteOptions wo;
|
|
wo.disableWAL = true;
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000)));
|
|
}
|
|
// Insert to db_.
|
|
ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
|
|
|
|
// WriteBufferManager Limit exceeded.
|
|
std::vector<port::Thread> threads;
|
|
int wait_count_db = 0;
|
|
InstrumentedMutex mutex;
|
|
InstrumentedCondVar cv(&mutex);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
|
|
"DBImpl::BackgroundCallFlush:start"}});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"WBMStallInterface::BlockDB", [&](void*) {
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
wait_count_db++;
|
|
cv.Signal();
|
|
// Since this is the last DB, signal Flush to continue.
|
|
if (wait_count_db == num_dbs + 1) {
|
|
TEST_SYNC_POINT(
|
|
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
|
|
}
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
bool s = true;
|
|
|
|
// Write to DB.
|
|
std::function<void(DB*)> write_db = [&](DB* db) {
|
|
Status tmp = db->Put(wo, Key(3), DummyString(1));
|
|
InstrumentedMutexLock lock(&mutex);
|
|
s = s && tmp.ok();
|
|
};
|
|
|
|
// Flow:
|
|
// db_ will write and will be blocked (as Flush will on hold and will create
|
|
// stall in effect).
|
|
// |
|
|
// multiple dbs writers will be created to write to that db and they will be
|
|
// blocked.
|
|
// |
|
|
// |
|
|
// Last writer will write and when its blocked it will signal Flush to
|
|
// continue to clear the stall.
|
|
|
|
threads.emplace_back(write_db, db_);
|
|
// Wait untill first DB is blocked and then create the multiple writers for
|
|
// different DBs which will be blocked from getting added to the queue because
|
|
// stall is in effect.
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
while (wait_count_db != 1) {
|
|
cv.Wait();
|
|
}
|
|
}
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
threads.emplace_back(write_db, dbs[i]);
|
|
}
|
|
for (auto& t : threads) {
|
|
t.join();
|
|
}
|
|
|
|
ASSERT_TRUE(s);
|
|
ASSERT_EQ(num_dbs + 1, wait_count_db);
|
|
// Clean up DBs.
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
ASSERT_OK(dbs[i]->Close());
|
|
ASSERT_OK(DestroyDB(dbnames[i], options));
|
|
delete dbs[i];
|
|
}
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// Test multiple threads writing across multiple DBs and multiple columns get
|
|
// blocked when stall by WriteBufferManager is in effect.
|
|
TEST_P(DBWriteBufferManagerTest, SharedWriteBufferLimitAcrossDB1) {
|
|
std::vector<std::string> dbnames;
|
|
std::vector<DB*> dbs;
|
|
int num_dbs = 3;
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
dbs.push_back(nullptr);
|
|
dbnames.push_back(
|
|
test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
|
|
}
|
|
|
|
Options options = CurrentOptions();
|
|
options.arena_block_size = 4096;
|
|
options.write_buffer_size = 500000; // this is never hit
|
|
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
|
|
ASSERT_LT(cache->GetUsage(), 256 * 1024);
|
|
cost_cache_ = GetParam();
|
|
|
|
if (cost_cache_) {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, cache, true));
|
|
} else {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, nullptr, true));
|
|
}
|
|
CreateAndReopenWithCF({"cf1", "cf2"}, options);
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
ASSERT_OK(DestroyDB(dbnames[i], options));
|
|
ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
|
|
}
|
|
WriteOptions wo;
|
|
wo.disableWAL = true;
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000)));
|
|
}
|
|
// Insert to db_.
|
|
ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
|
|
|
|
// WriteBufferManager::buffer_size_ has exceeded after the previous write to
|
|
// dbs[0] is completed.
|
|
std::vector<port::Thread> threads;
|
|
int wait_count_db = 0;
|
|
InstrumentedMutex mutex;
|
|
InstrumentedCondVar cv(&mutex);
|
|
std::unordered_set<WriteThread::Writer*> w_set;
|
|
std::vector<port::Thread> writer_threads;
|
|
std::atomic<int> thread_num(0);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
|
|
"DBImpl::BackgroundCallFlush:start"}});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"WBMStallInterface::BlockDB", [&](void*) {
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
wait_count_db++;
|
|
thread_num.fetch_add(1);
|
|
cv.Signal();
|
|
// Allow the flush to continue if all writer threads are blocked.
|
|
if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) {
|
|
TEST_SYNC_POINT(
|
|
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
|
|
}
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"WriteThread::WriteStall::Wait", [&](void* arg) {
|
|
WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
w_set.insert(w);
|
|
thread_num.fetch_add(1);
|
|
// Allow the flush continue if all writer threads are blocked.
|
|
if (thread_num.load(std::memory_order_relaxed) == 2 * num_dbs + 1) {
|
|
TEST_SYNC_POINT(
|
|
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
|
|
}
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
bool s1 = true, s2 = true;
|
|
// Write to multiple columns of db_.
|
|
std::function<void(int)> write_cf = [&](int cf) {
|
|
Status tmp = Put(cf, Key(3), DummyString(1), wo);
|
|
InstrumentedMutexLock lock(&mutex);
|
|
s1 = s1 && tmp.ok();
|
|
};
|
|
// Write to multiple DBs.
|
|
std::function<void(DB*)> write_db = [&](DB* db) {
|
|
Status tmp = db->Put(wo, Key(3), DummyString(1));
|
|
InstrumentedMutexLock lock(&mutex);
|
|
s2 = s2 && tmp.ok();
|
|
};
|
|
|
|
// Flow:
|
|
// thread will write to db_ will be blocked (as Flush will on hold,
|
|
// buffer_size_ has exceeded and will create stall in effect).
|
|
// |
|
|
// |
|
|
// multiple writers threads writing to different DBs and to db_ across
|
|
// multiple columns will be created and they will be blocked due to stall.
|
|
// |
|
|
// |
|
|
// Last writer thread will write and when its blocked it will signal Flush to
|
|
// continue to clear the stall.
|
|
threads.emplace_back(write_db, db_);
|
|
// Wait untill first thread is blocked and then create the multiple writer
|
|
// threads.
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
while (wait_count_db != 1) {
|
|
cv.Wait();
|
|
}
|
|
}
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
// Write to multiple columns of db_.
|
|
writer_threads.emplace_back(write_cf, i % 3);
|
|
// Write to different dbs.
|
|
threads.emplace_back(write_db, dbs[i]);
|
|
}
|
|
for (auto& t : threads) {
|
|
t.join();
|
|
}
|
|
for (auto& t : writer_threads) {
|
|
t.join();
|
|
}
|
|
|
|
ASSERT_TRUE(s1);
|
|
ASSERT_TRUE(s2);
|
|
|
|
// Number of DBs blocked.
|
|
ASSERT_EQ(num_dbs + 1, wait_count_db);
|
|
// Number of Writer threads blocked.
|
|
ASSERT_EQ(w_set.size(), num_dbs);
|
|
// Clean up DBs.
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
ASSERT_OK(dbs[i]->Close());
|
|
ASSERT_OK(DestroyDB(dbnames[i], options));
|
|
delete dbs[i];
|
|
}
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// Test multiple threads writing across multiple columns of db_ by passing
|
|
// different values to WriteOption.no_slown_down.
|
|
TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsSingleDB) {
|
|
Options options = CurrentOptions();
|
|
options.arena_block_size = 4096;
|
|
options.write_buffer_size = 500000; // this is never hit
|
|
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
|
|
ASSERT_LT(cache->GetUsage(), 256 * 1024);
|
|
cost_cache_ = GetParam();
|
|
|
|
if (cost_cache_) {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, cache, true));
|
|
} else {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, nullptr, true));
|
|
}
|
|
WriteOptions wo;
|
|
wo.disableWAL = true;
|
|
|
|
CreateAndReopenWithCF({"cf1", "cf2", "cf3"}, options);
|
|
|
|
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
|
|
Flush(3);
|
|
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
|
|
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
|
|
Flush(0);
|
|
|
|
// Write to "Default", "cf2" and "cf3". No flush will be triggered.
|
|
ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
|
|
ASSERT_OK(Put(0, Key(1), DummyString(40000), wo));
|
|
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
|
|
ASSERT_OK(Put(3, Key(2), DummyString(40000), wo));
|
|
|
|
// WriteBufferManager::buffer_size_ has exceeded after the previous write to
|
|
// db_ is completed.
|
|
|
|
std::unordered_set<WriteThread::Writer*> w_slowdown_set;
|
|
std::vector<port::Thread> threads;
|
|
int wait_count_db = 0;
|
|
int num_writers = 4;
|
|
InstrumentedMutex mutex;
|
|
InstrumentedCondVar cv(&mutex);
|
|
std::atomic<int> thread_num(0);
|
|
std::atomic<int> w_no_slowdown(0);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
|
|
"DBImpl::BackgroundCallFlush:start"}});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"WBMStallInterface::BlockDB", [&](void*) {
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
wait_count_db++;
|
|
cv.SignalAll();
|
|
}
|
|
});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"WriteThread::WriteStall::Wait", [&](void* arg) {
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
|
|
w_slowdown_set.insert(w);
|
|
// Allow the flush continue if all writer threads are blocked.
|
|
if (w_slowdown_set.size() + (unsigned long)w_no_slowdown.load(
|
|
std::memory_order_relaxed) ==
|
|
(unsigned long)num_writers) {
|
|
TEST_SYNC_POINT(
|
|
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
|
|
}
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
bool s1 = true, s2 = true;
|
|
|
|
std::function<void(int)> write_slow_down = [&](int cf) {
|
|
int a = thread_num.fetch_add(1);
|
|
std::string key = "foo" + std::to_string(a);
|
|
WriteOptions write_op;
|
|
write_op.no_slowdown = false;
|
|
Status tmp = Put(cf, Slice(key), DummyString(1), write_op);
|
|
InstrumentedMutexLock lock(&mutex);
|
|
s1 = s1 && tmp.ok();
|
|
};
|
|
|
|
std::function<void(int)> write_no_slow_down = [&](int cf) {
|
|
int a = thread_num.fetch_add(1);
|
|
std::string key = "foo" + std::to_string(a);
|
|
WriteOptions write_op;
|
|
write_op.no_slowdown = true;
|
|
Status tmp = Put(cf, Slice(key), DummyString(1), write_op);
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
s2 = s2 && !tmp.ok();
|
|
w_no_slowdown.fetch_add(1);
|
|
// Allow the flush continue if all writer threads are blocked.
|
|
if (w_slowdown_set.size() +
|
|
(unsigned long)w_no_slowdown.load(std::memory_order_relaxed) ==
|
|
(unsigned long)num_writers) {
|
|
TEST_SYNC_POINT(
|
|
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
|
|
}
|
|
}
|
|
};
|
|
|
|
// Flow:
|
|
// main_writer thread will write but will be blocked (as Flush will on hold,
|
|
// buffer_size_ has exceeded, thus will create stall in effect).
|
|
// |
|
|
// |
|
|
// multiple writer threads will be created to write across multiple columns
|
|
// with different values of WriteOptions.no_slowdown. Some of them will
|
|
// be blocked and some of them will return with Incomplete status.
|
|
// |
|
|
// |
|
|
// Last writer thread will write and when its blocked/return it will signal
|
|
// Flush to continue to clear the stall.
|
|
threads.emplace_back(write_slow_down, 1);
|
|
// Wait untill first thread (main_writer) writing to DB is blocked and then
|
|
// create the multiple writers which will be blocked from getting added to the
|
|
// queue because stall is in effect.
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
while (wait_count_db != 1) {
|
|
cv.Wait();
|
|
}
|
|
}
|
|
|
|
for (int i = 0; i < num_writers; i += 2) {
|
|
threads.emplace_back(write_no_slow_down, (i) % 4);
|
|
threads.emplace_back(write_slow_down, (i + 1) % 4);
|
|
}
|
|
for (auto& t : threads) {
|
|
t.join();
|
|
}
|
|
|
|
ASSERT_TRUE(s1);
|
|
ASSERT_TRUE(s2);
|
|
// Number of DBs blocked.
|
|
ASSERT_EQ(wait_count_db, 1);
|
|
// Number of Writer threads blocked.
|
|
ASSERT_EQ(w_slowdown_set.size(), num_writers / 2);
|
|
// Number of Writer threads with WriteOptions.no_slowdown = true.
|
|
ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_writers / 2);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// Test multiple threads writing across multiple columns of db_ and different
|
|
// dbs by passing different values to WriteOption.no_slown_down.
|
|
TEST_P(DBWriteBufferManagerTest, MixedSlowDownOptionsMultipleDB) {
|
|
std::vector<std::string> dbnames;
|
|
std::vector<DB*> dbs;
|
|
int num_dbs = 4;
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
dbs.push_back(nullptr);
|
|
dbnames.push_back(
|
|
test::PerThreadDBPath("db_shared_wb_db" + std::to_string(i)));
|
|
}
|
|
|
|
Options options = CurrentOptions();
|
|
options.arena_block_size = 4096;
|
|
options.write_buffer_size = 500000; // this is never hit
|
|
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
|
|
ASSERT_LT(cache->GetUsage(), 256 * 1024);
|
|
cost_cache_ = GetParam();
|
|
|
|
if (cost_cache_) {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, cache, true));
|
|
} else {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(100000, nullptr, true));
|
|
}
|
|
CreateAndReopenWithCF({"cf1", "cf2"}, options);
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
ASSERT_OK(DestroyDB(dbnames[i], options));
|
|
ASSERT_OK(DB::Open(options, dbnames[i], &(dbs[i])));
|
|
}
|
|
WriteOptions wo;
|
|
wo.disableWAL = true;
|
|
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
ASSERT_OK(dbs[i]->Put(wo, Key(1), DummyString(20000)));
|
|
}
|
|
// Insert to db_.
|
|
ASSERT_OK(Put(0, Key(1), DummyString(30000), wo));
|
|
|
|
// WriteBufferManager::buffer_size_ has exceeded after the previous write to
|
|
// dbs[0] is completed.
|
|
std::vector<port::Thread> threads;
|
|
int wait_count_db = 0;
|
|
InstrumentedMutex mutex;
|
|
InstrumentedCondVar cv(&mutex);
|
|
std::unordered_set<WriteThread::Writer*> w_slowdown_set;
|
|
std::vector<port::Thread> writer_threads;
|
|
std::atomic<int> thread_num(0);
|
|
std::atomic<int> w_no_slowdown(0);
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
|
{{"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0",
|
|
"DBImpl::BackgroundCallFlush:start"}});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"WBMStallInterface::BlockDB", [&](void*) {
|
|
InstrumentedMutexLock lock(&mutex);
|
|
wait_count_db++;
|
|
cv.Signal();
|
|
// Allow the flush continue if all writer threads are blocked.
|
|
if (w_slowdown_set.size() +
|
|
(unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) +
|
|
wait_count_db) ==
|
|
(unsigned long)(2 * num_dbs + 1)) {
|
|
TEST_SYNC_POINT(
|
|
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
|
|
}
|
|
});
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"WriteThread::WriteStall::Wait", [&](void* arg) {
|
|
WriteThread::Writer* w = reinterpret_cast<WriteThread::Writer*>(arg);
|
|
InstrumentedMutexLock lock(&mutex);
|
|
w_slowdown_set.insert(w);
|
|
// Allow the flush continue if all writer threads are blocked.
|
|
if (w_slowdown_set.size() +
|
|
(unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) +
|
|
wait_count_db) ==
|
|
(unsigned long)(2 * num_dbs + 1)) {
|
|
TEST_SYNC_POINT(
|
|
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
bool s1 = true, s2 = true;
|
|
std::function<void(DB*)> write_slow_down = [&](DB* db) {
|
|
int a = thread_num.fetch_add(1);
|
|
std::string key = "foo" + std::to_string(a);
|
|
WriteOptions write_op;
|
|
write_op.no_slowdown = false;
|
|
Status tmp = db->Put(write_op, Slice(key), DummyString(1));
|
|
InstrumentedMutexLock lock(&mutex);
|
|
s1 = s1 && tmp.ok();
|
|
};
|
|
|
|
std::function<void(DB*)> write_no_slow_down = [&](DB* db) {
|
|
int a = thread_num.fetch_add(1);
|
|
std::string key = "foo" + std::to_string(a);
|
|
WriteOptions write_op;
|
|
write_op.no_slowdown = true;
|
|
Status tmp = db->Put(write_op, Slice(key), DummyString(1));
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
s2 = s2 && !tmp.ok();
|
|
w_no_slowdown.fetch_add(1);
|
|
if (w_slowdown_set.size() +
|
|
(unsigned long)(w_no_slowdown.load(std::memory_order_relaxed) +
|
|
wait_count_db) ==
|
|
(unsigned long)(2 * num_dbs + 1)) {
|
|
TEST_SYNC_POINT(
|
|
"DBWriteBufferManagerTest::SharedWriteBufferAcrossCFs:0");
|
|
}
|
|
}
|
|
};
|
|
|
|
// Flow:
|
|
// first thread will write but will be blocked (as Flush will on hold,
|
|
// buffer_size_ has exceeded, thus will create stall in effect).
|
|
// |
|
|
// |
|
|
// multiple writer threads will be created to write across multiple columns
|
|
// of db_ and different DBs with different values of
|
|
// WriteOptions.no_slowdown. Some of them will be blocked and some of them
|
|
// will return with Incomplete status.
|
|
// |
|
|
// |
|
|
// Last writer thread will write and when its blocked/return it will signal
|
|
// Flush to continue to clear the stall.
|
|
threads.emplace_back(write_slow_down, db_);
|
|
// Wait untill first thread writing to DB is blocked and then
|
|
// create the multiple writers.
|
|
{
|
|
InstrumentedMutexLock lock(&mutex);
|
|
while (wait_count_db != 1) {
|
|
cv.Wait();
|
|
}
|
|
}
|
|
|
|
for (int i = 0; i < num_dbs; i += 2) {
|
|
// Write to multiple columns of db_.
|
|
writer_threads.emplace_back(write_slow_down, db_);
|
|
writer_threads.emplace_back(write_no_slow_down, db_);
|
|
// Write to different DBs.
|
|
threads.emplace_back(write_slow_down, dbs[i]);
|
|
threads.emplace_back(write_no_slow_down, dbs[i + 1]);
|
|
}
|
|
|
|
for (auto& t : threads) {
|
|
t.join();
|
|
}
|
|
|
|
for (auto& t : writer_threads) {
|
|
t.join();
|
|
}
|
|
|
|
ASSERT_TRUE(s1);
|
|
ASSERT_TRUE(s2);
|
|
// Number of DBs blocked.
|
|
ASSERT_EQ((num_dbs / 2) + 1, wait_count_db);
|
|
// Number of writer threads writing to db_ blocked from getting added to the
|
|
// queue.
|
|
ASSERT_EQ(w_slowdown_set.size(), num_dbs / 2);
|
|
// Number of threads with WriteOptions.no_slowdown = true.
|
|
ASSERT_EQ(w_no_slowdown.load(std::memory_order_relaxed), num_dbs);
|
|
|
|
// Clean up DBs.
|
|
for (int i = 0; i < num_dbs; i++) {
|
|
ASSERT_OK(dbs[i]->Close());
|
|
ASSERT_OK(DestroyDB(dbnames[i], options));
|
|
delete dbs[i];
|
|
}
|
|
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
// Tests a `WriteBufferManager` constructed with `allow_stall == false` does not
|
|
// thrash memtable switching when full and a CF receives multiple writes.
|
|
// Instead, we expect to switch a CF's memtable for flush only when that CF does
|
|
// not have any pending or running flush.
|
|
//
|
|
// This test uses multiple DBs each with a single CF instead of a single DB
|
|
// with multiple CFs. That way we can control which CF is considered for switch
|
|
// by writing to that CF's DB.
|
|
//
|
|
// Not supported in LITE mode due to `GetProperty()` unavailable.
|
|
TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) {
|
|
Options options = CurrentOptions();
|
|
options.arena_block_size = 4 << 10; // 4KB
|
|
options.write_buffer_size = 1 << 20; // 1MB
|
|
std::shared_ptr<Cache> cache =
|
|
NewLRUCache(4 << 20 /* capacity (4MB) */, 2 /* num_shard_bits */);
|
|
ASSERT_LT(cache->GetUsage(), 256 << 10 /* 256KB */);
|
|
cost_cache_ = GetParam();
|
|
if (cost_cache_) {
|
|
options.write_buffer_manager.reset(new WriteBufferManager(
|
|
512 << 10 /* buffer_size (512KB) */, cache, false /* allow_stall */));
|
|
} else {
|
|
options.write_buffer_manager.reset(
|
|
new WriteBufferManager(512 << 10 /* buffer_size (512KB) */,
|
|
nullptr /* cache */, false /* allow_stall */));
|
|
}
|
|
|
|
Reopen(options);
|
|
std::string dbname = test::PerThreadDBPath("db_shared_wbm_db");
|
|
DB* shared_wbm_db = nullptr;
|
|
|
|
ASSERT_OK(DestroyDB(dbname, options));
|
|
ASSERT_OK(DB::Open(options, dbname, &shared_wbm_db));
|
|
|
|
// The last write will make WBM need flush, but it won't flush yet.
|
|
ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions()));
|
|
ASSERT_FALSE(options.write_buffer_manager->ShouldFlush());
|
|
ASSERT_OK(Put(Key(1), DummyString(256 << 10 /* 256KB */), WriteOptions()));
|
|
ASSERT_TRUE(options.write_buffer_manager->ShouldFlush());
|
|
|
|
// Flushes will be pending, not running because flush threads are blocked.
|
|
test::SleepingBackgroundTask sleeping_task_high;
|
|
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
|
&sleeping_task_high, Env::Priority::HIGH);
|
|
|
|
for (int i = 0; i < 3; ++i) {
|
|
ASSERT_OK(
|
|
shared_wbm_db->Put(WriteOptions(), Key(1), DummyString(1 /* len */)));
|
|
std::string prop;
|
|
ASSERT_TRUE(
|
|
shared_wbm_db->GetProperty("rocksdb.num-immutable-mem-table", &prop));
|
|
ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop);
|
|
ASSERT_TRUE(
|
|
shared_wbm_db->GetProperty("rocksdb.mem-table-flush-pending", &prop));
|
|
ASSERT_EQ(std::to_string(i > 0 ? 1 : 0), prop);
|
|
}
|
|
|
|
// Clean up DBs.
|
|
sleeping_task_high.WakeUp();
|
|
sleeping_task_high.WaitUntilDone();
|
|
ASSERT_OK(shared_wbm_db->Close());
|
|
ASSERT_OK(DestroyDB(dbname, options));
|
|
delete shared_wbm_db;
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
|
|
testing::Bool());
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
RegisterCustomObjects(argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|