rocksdb/db/db_write_test.cc
Peter Dillinger e168c1b1a4 Use FaultInjectionTestFS in DBWriteTest.LockWALInEffect (#11271)
Summary:
Existing use of FaultInjectionTestEnv shows rare TSAN errors with parallel Sync and Flush. This appears to be fixed in FaultInjectionTestFS. (Sigh, code duplication and divergence.)

Example failure:
https://app.circleci.com/pipelines/github/facebook/rocksdb/24631/workflows/fc2a66f0-f21c-48d6-a944-3885bcff50a4/jobs/571928

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11271

Test Plan: wasn't able to reproduce locally but stress tested the updated test with gtest-parallel -r1000 and TSAN.

Reviewed By: ajkr

Differential Revision: D43779477

Pulled By: pdillinger

fbshipit-source-id: a019b0f1d4045a26a15ab08aab63828a398f6d3e
2023-03-05 08:21:16 -08:00

789 lines
26 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <atomic>
#include <cstdint>
#include <fstream>
#include <memory>
#include <thread>
#include <vector>
#include "db/db_test_util.h"
#include "db/write_batch_internal.h"
#include "db/write_thread.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
#include "util/random.h"
#include "util/string_util.h"
#include "utilities/fault_injection_env.h"
#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
// Test variations of WriteImpl.
class DBWriteTest : public DBTestBase, public testing::WithParamInterface<int> {
public:
DBWriteTest() : DBTestBase("db_write_test", /*env_do_fsync=*/true) {}
Options GetOptions() { return DBTestBase::GetOptions(GetParam()); }
void Open() { DBTestBase::Reopen(GetOptions()); }
};
class DBWriteTestUnparameterized : public DBTestBase {
public:
explicit DBWriteTestUnparameterized()
: DBTestBase("pipelined_write_test", /*env_do_fsync=*/false) {}
};
// It is invalid to do sync write while disabling WAL.
TEST_P(DBWriteTest, SyncAndDisableWAL) {
WriteOptions write_options;
write_options.sync = true;
write_options.disableWAL = true;
ASSERT_TRUE(dbfull()->Put(write_options, "foo", "bar").IsInvalidArgument());
WriteBatch batch;
ASSERT_OK(batch.Put("foo", "bar"));
ASSERT_TRUE(dbfull()->Write(write_options, &batch).IsInvalidArgument());
}
TEST_P(DBWriteTest, WriteStallRemoveNoSlowdownWrite) {
Options options = GetOptions();
options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
4;
std::vector<port::Thread> threads;
std::atomic<int> thread_num(0);
port::Mutex mutex;
port::CondVar cv(&mutex);
// Guarded by mutex
int writers = 0;
Reopen(options);
std::function<void()> write_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = false;
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
};
std::function<void()> write_no_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = true;
Status s = dbfull()->Put(wo, key, "bar");
ASSERT_TRUE(s.ok() || s.IsIncomplete());
};
std::function<void(void*)> unblock_main_thread_func = [&](void*) {
mutex.Lock();
++writers;
cv.SignalAll();
mutex.Unlock();
};
// Create 3 L0 files and schedule 4th without waiting
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:1",
"DBImpl::BackgroundCallFlush:start"},
{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:2",
"DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"},
// Make compaction start wait for the write stall to be detected and
// implemented by a write group leader
{"DBWriteTest::WriteStallRemoveNoSlowdownWrite:3",
"BackgroundCallCompaction:0"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Schedule creation of 4th L0 file without waiting. This will seal the
// memtable and then wait for a sync point before writing the file. We need
// to do it this way because SwitchMemtable() needs to enter the
// write_thread
FlushOptions fopt;
fopt.wait = false;
ASSERT_OK(dbfull()->Flush(fopt));
// Create a mix of slowdown/no_slowdown write threads
mutex.Lock();
// First leader
threads.emplace_back(write_slowdown_func);
while (writers != 1) {
cv.Wait();
}
// Second leader. Will stall writes
// Build a writers list with no slowdown in the middle:
// +-------------+
// | slowdown +<----+ newest
// +--+----------+
// |
// v
// +--+----------+
// | no slowdown |
// +--+----------+
// |
// v
// +--+----------+
// | slowdown +
// +-------------+
threads.emplace_back(write_slowdown_func);
while (writers != 2) {
cv.Wait();
}
threads.emplace_back(write_no_slowdown_func);
while (writers != 3) {
cv.Wait();
}
threads.emplace_back(write_slowdown_func);
while (writers != 4) {
cv.Wait();
}
mutex.Unlock();
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:1");
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
// This would have triggered a write stall. Unblock the write group leader
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:2");
// The leader is going to create missing newer links. When the leader
// finishes, the next leader is going to delay writes and fail writers with
// no_slowdown
TEST_SYNC_POINT("DBWriteTest::WriteStallRemoveNoSlowdownWrite:3");
for (auto& t : threads) {
t.join();
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBWriteTest, WriteThreadHangOnWriteStall) {
Options options = GetOptions();
options.level0_stop_writes_trigger = options.level0_slowdown_writes_trigger =
4;
std::vector<port::Thread> threads;
std::atomic<int> thread_num(0);
port::Mutex mutex;
port::CondVar cv(&mutex);
// Guarded by mutex
int writers = 0;
Reopen(options);
std::function<void()> write_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = false;
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
};
std::function<void()> write_no_slowdown_func = [&]() {
int a = thread_num.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
wo.no_slowdown = true;
Status s = dbfull()->Put(wo, key, "bar");
ASSERT_TRUE(s.ok() || s.IsIncomplete());
};
std::function<void(void*)> unblock_main_thread_func = [&](void*) {
mutex.Lock();
++writers;
cv.SignalAll();
mutex.Unlock();
};
// Create 3 L0 files and schedule 4th without waiting
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ASSERT_OK(Flush());
ASSERT_OK(Put("foo" + std::to_string(thread_num.fetch_add(1)), "bar"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Start", unblock_main_thread_func);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBWriteTest::WriteThreadHangOnWriteStall:1",
"DBImpl::BackgroundCallFlush:start"},
{"DBWriteTest::WriteThreadHangOnWriteStall:2",
"DBImpl::WriteImpl:BeforeLeaderEnters"},
// Make compaction start wait for the write stall to be detected and
// implemented by a write group leader
{"DBWriteTest::WriteThreadHangOnWriteStall:3",
"BackgroundCallCompaction:0"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Schedule creation of 4th L0 file without waiting. This will seal the
// memtable and then wait for a sync point before writing the file. We need
// to do it this way because SwitchMemtable() needs to enter the
// write_thread
FlushOptions fopt;
fopt.wait = false;
ASSERT_OK(dbfull()->Flush(fopt));
// Create a mix of slowdown/no_slowdown write threads
mutex.Lock();
// First leader
threads.emplace_back(write_slowdown_func);
while (writers != 1) {
cv.Wait();
}
// Second leader. Will stall writes
threads.emplace_back(write_slowdown_func);
threads.emplace_back(write_no_slowdown_func);
threads.emplace_back(write_slowdown_func);
threads.emplace_back(write_no_slowdown_func);
threads.emplace_back(write_slowdown_func);
while (writers != 6) {
cv.Wait();
}
mutex.Unlock();
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:1");
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(nullptr));
// This would have triggered a write stall. Unblock the write group leader
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:2");
// The leader is going to create missing newer links. When the leader
// finishes, the next leader is going to delay writes and fail writers with
// no_slowdown
TEST_SYNC_POINT("DBWriteTest::WriteThreadHangOnWriteStall:3");
for (auto& t : threads) {
t.join();
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) {
constexpr int kNumThreads = 5;
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(env_));
Options options = GetOptions();
options.env = mock_env.get();
Reopen(options);
std::atomic<int> ready_count{0};
std::atomic<int> leader_count{0};
std::vector<port::Thread> threads;
mock_env->SetFilesystemActive(false);
// Wait until all threads linked to write threads, to make sure
// all threads join the same batch group.
SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
ready_count++;
auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
if (w->state == WriteThread::STATE_GROUP_LEADER) {
leader_count++;
while (ready_count < kNumThreads) {
// busy waiting
}
}
});
SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < kNumThreads; i++) {
threads.push_back(port::Thread(
[&](int index) {
// All threads should fail.
auto res = Put("key" + std::to_string(index), "value");
if (options.manual_wal_flush) {
ASSERT_TRUE(res.ok());
// we should see fs error when we do the flush
// TSAN reports a false alarm for lock-order-inversion but Open and
// FlushWAL are not run concurrently. Disabling this until TSAN is
// fixed.
// res = dbfull()->FlushWAL(false);
// ASSERT_FALSE(res.ok());
} else {
ASSERT_FALSE(res.ok());
}
},
i));
}
for (int i = 0; i < kNumThreads; i++) {
threads[i].join();
}
ASSERT_EQ(1, leader_count);
// The Failed PUT operations can cause a BG error to be set.
// Mark it as Checked for the ASSERT_STATUS_CHECKED
dbfull()->Resume().PermitUncheckedError();
// Close before mock_env destruct.
Close();
}
TEST_F(DBWriteTestUnparameterized, PipelinedWriteRace) {
// This test was written to trigger a race in ExitAsBatchGroupLeader in case
// enable_pipelined_write_ was true.
// Writers for which ShouldWriteToMemtable() evaluates to false are removed
// from the write_group via CompleteFollower/ CompleteLeader. Writers in the
// middle of the group are fully unlinked, but if that writers is the
// last_writer, then we did not update the predecessor's link_older, i.e.,
// this writer was still reachable via newest_writer_.
//
// But the problem was, that CompleteFollower already wakes up the thread
// owning that writer before the writer has been removed. This resulted in a
// race - if the leader thread was fast enough, then everything was fine.
// However, if the woken up thread finished the current write operation and
// then performed yet another write, then a new writer instance was added
// to newest_writer_. It is possible that the new writer is located on the
// same address on stack, and if this happened, then we had a problem,
// because the old code tried to find the last_writer in the list to unlink
// it, which in this case produced a cycle in the list.
// Whether two invocations of PipelinedWriteImpl() by the same thread actually
// allocate the writer on the same address depends on the OS and/or compiler,
// so it is rather hard to create a deterministic test for this.
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.enable_pipelined_write = true;
std::vector<port::Thread> threads;
std::atomic<int> write_counter{0};
std::atomic<int> active_writers{0};
std::atomic<bool> second_write_starting{false};
std::atomic<bool> second_write_in_progress{false};
std::atomic<WriteThread::Writer*> leader{nullptr};
std::atomic<bool> finished_WAL_write{false};
DestroyAndReopen(options);
auto write_one_doc = [&]() {
int a = write_counter.fetch_add(1);
std::string key = "foo" + std::to_string(a);
WriteOptions wo;
ASSERT_OK(dbfull()->Put(wo, key, "bar"));
--active_writers;
};
auto write_two_docs = [&]() {
write_one_doc();
second_write_starting = true;
write_one_doc();
};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* arg) {
if (second_write_starting.load()) {
second_write_in_progress = true;
return;
}
auto* w = reinterpret_cast<WriteThread::Writer*>(arg);
if (w->state == WriteThread::STATE_GROUP_LEADER) {
active_writers++;
if (leader.load() == nullptr) {
leader.store(w);
while (active_writers.load() < 2) {
// wait for another thread to join the write_group
}
}
} else {
// we disable the memtable for all followers so that they they are
// removed from the write_group before enqueuing it for the memtable
// write
w->disable_memtable = true;
active_writers++;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::ExitAsBatchGroupLeader:Start", [&](void* arg) {
auto* wg = reinterpret_cast<WriteThread::WriteGroup*>(arg);
if (wg->leader == leader && !finished_WAL_write) {
finished_WAL_write = true;
while (active_writers.load() < 3) {
// wait for the new writer to be enqueued
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::ExitAsBatchGroupLeader:AfterCompleteWriters",
[&](void* arg) {
auto* wg = reinterpret_cast<WriteThread::WriteGroup*>(arg);
if (wg->leader == leader) {
while (!second_write_in_progress.load()) {
// wait for the old follower thread to start the next write
}
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// start leader + one follower
threads.emplace_back(write_one_doc);
while (leader.load() == nullptr) {
// wait for leader
}
// we perform two writes in the follower, so that for the second write
// the thread reinserts a Writer with the same address
threads.emplace_back(write_two_docs);
// wait for the leader to enter ExitAsBatchGroupLeader
while (!finished_WAL_write.load()) {
// wait for write_group to have finished the WAL writes
}
// start another writer thread to be enqueued before the leader can
// complete the writers from its write_group
threads.emplace_back(write_one_doc);
for (auto& t : threads) {
t.join();
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_P(DBWriteTest, ManualWalFlushInEffect) {
Options options = GetOptions();
Reopen(options);
// try the 1st WAL created during open
ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
// try the 2nd wal created during SwitchWAL
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_TRUE(Put("key" + std::to_string(0), "value").ok());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
}
TEST_P(DBWriteTest, UnflushedPutRaceWithTrackedWalSync) {
// Repro race condition bug where unflushed WAL data extended the synced size
// recorded to MANIFEST despite being unrecoverable.
Options options = GetOptions();
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
options.env = fault_env.get();
options.manual_wal_flush = true;
options.track_and_verify_wals_in_manifest = true;
Reopen(options);
ASSERT_OK(Put("key1", "val1"));
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncWAL:Begin",
[this](void* /* arg */) { ASSERT_OK(Put("key2", "val2")); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->FlushWAL(true /* sync */));
// Ensure callback ran.
ASSERT_EQ("val2", Get("key2"));
Close();
// Simulate full loss of unsynced data. This drops "key2" -> "val2" from the
// DB WAL.
fault_env->DropUnsyncedFileData();
Reopen(options);
// Need to close before `fault_env` goes out of scope.
Close();
}
TEST_P(DBWriteTest, InactiveWalFullySyncedBeforeUntracked) {
// Repro bug where a WAL is appended and switched after
// `FlushWAL(true /* sync */)`'s sync finishes and before it untracks fully
// synced inactive logs. Previously such a WAL would be wrongly untracked
// so the final append would never be synced.
Options options = GetOptions();
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(env_));
options.env = fault_env.get();
Reopen(options);
ASSERT_OK(Put("key1", "val1"));
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncWAL:BeforeMarkLogsSynced:1", [this](void* /* arg */) {
ASSERT_OK(Put("key2", "val2"));
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->FlushWAL(true /* sync */));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_OK(Put("key3", "val3"));
ASSERT_OK(db_->FlushWAL(true /* sync */));
Close();
// Simulate full loss of unsynced data. This should drop nothing since we did
// `FlushWAL(true /* sync */)` before `Close()`.
fault_env->DropUnsyncedFileData();
Reopen(options);
ASSERT_EQ("val1", Get("key1"));
ASSERT_EQ("val2", Get("key2"));
ASSERT_EQ("val3", Get("key3"));
// Need to close before `fault_env` goes out of scope.
Close();
}
TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) {
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(env_));
Options options = GetOptions();
options.env = mock_env.get();
Reopen(options);
for (int i = 0; i < 2; i++) {
// Forcibly fail WAL write for the first Put only. Subsequent Puts should
// fail due to read-only mode
mock_env->SetFilesystemActive(i != 0);
auto res = Put("key" + std::to_string(i), "value");
// TSAN reports a false alarm for lock-order-inversion but Open and
// FlushWAL are not run concurrently. Disabling this until TSAN is
// fixed.
/*
if (options.manual_wal_flush && i == 0) {
// even with manual_wal_flush the 2nd Put should return error because of
// the read-only mode
ASSERT_TRUE(res.ok());
// we should see fs error when we do the flush
res = dbfull()->FlushWAL(false);
}
*/
if (!options.manual_wal_flush) {
ASSERT_NOK(res);
} else {
ASSERT_OK(res);
}
}
// Close before mock_env destruct.
Close();
}
TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
Random rnd(301);
std::unique_ptr<FaultInjectionTestEnv> mock_env(
new FaultInjectionTestEnv(env_));
Options options = GetOptions();
options.env = mock_env.get();
options.writable_file_max_buffer_size = 4 * 1024 * 1024;
options.write_buffer_size = 3 * 512 * 1024;
options.wal_bytes_per_sync = 256 * 1024;
options.manual_wal_flush = true;
Reopen(options);
mock_env->SetFilesystemActive(false, Status::IOError("Not active"));
Status s;
for (int i = 0; i < 4 * 512; ++i) {
s = Put(Key(i), rnd.RandomString(1024));
if (!s.ok()) {
break;
}
}
ASSERT_EQ(s.severity(), Status::Severity::kFatalError);
mock_env->SetFilesystemActive(true);
// Close before mock_env destruct.
Close();
}
// Test that db->LockWAL() flushes the WAL after locking, which can fail
TEST_P(DBWriteTest, LockWALInEffect) {
if (mem_env_ || encrypted_env_) {
ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
return;
}
Options options = GetOptions();
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
options.env = fault_fs_env.get();
options.disable_auto_compactions = true;
options.paranoid_checks = false;
Reopen(options);
// try the 1st WAL created during open
ASSERT_OK(Put("key0", "value"));
ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());
ASSERT_OK(db_->LockWAL());
ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
ASSERT_OK(db_->UnlockWAL());
// try the 2nd wal created during SwitchWAL
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(Put("key1", "value"));
ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());
ASSERT_OK(db_->LockWAL());
ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
ASSERT_OK(db_->UnlockWAL());
// Fail the WAL flush if applicable
fault_fs->SetFilesystemActive(false);
Status s = Put("key2", "value");
if (options.manual_wal_flush) {
ASSERT_OK(s);
// I/O failure
ASSERT_NOK(db_->LockWAL());
// Should not need UnlockWAL after LockWAL fails
} else {
ASSERT_NOK(s);
ASSERT_OK(db_->LockWAL());
ASSERT_OK(db_->UnlockWAL());
}
fault_fs->SetFilesystemActive(true);
// Writes should work again
ASSERT_OK(Put("key3", "value"));
ASSERT_EQ(Get("key3"), "value");
// Should be extraneous, but allowed
ASSERT_NOK(db_->UnlockWAL());
// Close before mock_env destruct.
Close();
}
TEST_P(DBWriteTest, LockWALConcurrentRecursive) {
Options options = GetOptions();
Reopen(options);
ASSERT_OK(Put("k1", "val"));
ASSERT_OK(db_->LockWAL()); // 0 -> 1
auto frozen_seqno = db_->GetLatestSequenceNumber();
std::atomic<bool> t1_completed{false};
port::Thread t1{[&]() {
// Won't finish until WAL unlocked
ASSERT_OK(Put("k1", "val2"));
t1_completed = true;
}};
ASSERT_OK(db_->LockWAL()); // 1 -> 2
// Read-only ops are OK
ASSERT_EQ(Get("k1"), "val");
{
std::vector<LiveFileStorageInfo> files;
LiveFilesStorageInfoOptions lf_opts;
// A DB flush could deadlock
lf_opts.wal_size_for_flush = UINT64_MAX;
ASSERT_OK(db_->GetLiveFilesStorageInfo({lf_opts}, &files));
}
port::Thread t2{[&]() {
ASSERT_OK(db_->LockWAL()); // 2 -> 3 or 1 -> 2
}};
ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 or 3 -> 2
// Give t1 an extra chance to jump in case of bug
std::this_thread::yield();
t2.join();
ASSERT_FALSE(t1_completed.load());
// Should now have 2 outstanding LockWAL
ASSERT_EQ(Get("k1"), "val");
ASSERT_OK(db_->UnlockWAL()); // 2 -> 1
ASSERT_FALSE(t1_completed.load());
ASSERT_EQ(Get("k1"), "val");
ASSERT_EQ(frozen_seqno, db_->GetLatestSequenceNumber());
// Ensure final Unlock is concurrency safe and extra Unlock is safe but
// non-OK
std::atomic<int> unlock_ok{0};
port::Thread t3{[&]() {
if (db_->UnlockWAL().ok()) {
unlock_ok++;
}
ASSERT_OK(db_->LockWAL());
if (db_->UnlockWAL().ok()) {
unlock_ok++;
}
}};
if (db_->UnlockWAL().ok()) {
unlock_ok++;
}
t3.join();
// There was one extra unlock, so just one non-ok
ASSERT_EQ(unlock_ok.load(), 2);
// Write can proceed
t1.join();
ASSERT_TRUE(t1_completed.load());
ASSERT_EQ(Get("k1"), "val2");
// And new writes
ASSERT_OK(Put("k2", "val"));
ASSERT_EQ(Get("k2"), "val");
}
TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
Options options = GetOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kAll);
Reopen(options);
std::string wal_key_prefix = "WAL_KEY_";
std::string no_wal_key_prefix = "K_";
// 100 KB value each for NO-WAL operation
std::string no_wal_value(1024 * 100, 'X');
// 1B value each for WAL operation
std::string wal_value = "0";
std::thread threads[10];
for (int t = 0; t < 10; t++) {
threads[t] = std::thread([t, wal_key_prefix, wal_value, no_wal_key_prefix,
no_wal_value, this] {
for (int i = 0; i < 10; i++) {
ROCKSDB_NAMESPACE::WriteOptions write_option_disable;
write_option_disable.disableWAL = true;
ROCKSDB_NAMESPACE::WriteOptions write_option_default;
std::string no_wal_key =
no_wal_key_prefix + std::to_string(t) + "_" + std::to_string(i);
ASSERT_OK(this->Put(no_wal_key, no_wal_value, write_option_disable));
std::string wal_key =
wal_key_prefix + std::to_string(i) + "_" + std::to_string(i);
ASSERT_OK(this->Put(wal_key, wal_value, write_option_default));
ASSERT_OK(dbfull()->SyncWAL());
}
return;
});
}
for (auto& t : threads) {
t.join();
}
uint64_t bytes_num = options.statistics->getTickerCount(
ROCKSDB_NAMESPACE::Tickers::WAL_FILE_BYTES);
// written WAL size should less than 100KB (even included HEADER & FOOTER
// overhead)
ASSERT_LE(bytes_num, 1024 * 100);
}
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,
DBTestBase::kPipelinedWrite));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}