Disallow memtable flush and sst ingest while WAL is locked (#12652)

Summary:
We recently noticed that some memtable flushed and file
ingestions could proceed during LockWAL, in violation of its stated
contract. (Note: we aren't 100% sure its actually needed by MySQL, but
we want it to be in a clean state nonetheless.)

Despite earlier skepticism that this could be done safely (https://github.com/facebook/rocksdb/issues/12666), I
found a place to wait to wait for LockWAL to be cleared before allowing
these operations to proceed: WaitForPendingWrites()

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12652

Test Plan:
Added to unit tests. Extended how db_stress validates LockWAL
and re-enabled combination of ingestion and LockWAL in crash test, in
follow-up to https://github.com/facebook/rocksdb/issues/12642

Ran blackbox_crash_test for a long while with relevant features
amplified.

Suggested follow-up: fix FaultInjectionTestFS to report file sizes
consistent with what the user has requested to be flushed.

Reviewed By: jowlyzhang

Differential Revision: D57622142

Pulled By: pdillinger

fbshipit-source-id: aef265fce69465618974b4ec47f4636257c676ce
This commit is contained in:
Peter Dillinger 2024-05-21 10:17:34 -07:00 committed by Facebook GitHub Bot
parent d7b938882e
commit d89ab23bec
9 changed files with 157 additions and 54 deletions

View File

@ -229,14 +229,20 @@ Status DBImpl::GetLiveFilesStorageInfo(
// metadata. // metadata.
mutex_.Lock(); mutex_.Lock();
if (flush_memtable) { if (flush_memtable) {
bool wal_locked = lock_wal_count_ > 0;
if (wal_locked) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Can't FlushForGetLiveFiles while WAL is locked");
} else {
Status status = FlushForGetLiveFiles(); Status status = FlushForGetLiveFiles();
if (!status.ok()) { if (!status.ok()) {
mutex_.Unlock(); mutex_.Unlock();
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Cannot Flush data %s\n", ROCKS_LOG_ERROR(immutable_db_options_.info_log,
status.ToString().c_str()); "Cannot Flush data %s\n", status.ToString().c_str());
return status; return status;
} }
} }
}
// Make a set of all of the live table and blob files // Make a set of all of the live table and blob files
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {

View File

@ -2054,18 +2054,23 @@ class DBImpl : public DB {
mutex_.Lock(); mutex_.Lock();
} }
if (!immutable_db_options_.unordered_write) { if (immutable_db_options_.unordered_write) {
// Then the writes are finished before the next write group starts
return;
}
// Wait for the ones who already wrote to the WAL to finish their // Wait for the ones who already wrote to the WAL to finish their
// memtable write. // memtable write.
if (pending_memtable_writes_.load() != 0) { if (pending_memtable_writes_.load() != 0) {
// XXX: suspicious wait while holding DB mutex?
std::unique_lock<std::mutex> guard(switch_mutex_); std::unique_lock<std::mutex> guard(switch_mutex_);
switch_cv_.wait(guard, switch_cv_.wait(guard,
[&] { return pending_memtable_writes_.load() == 0; }); [&] { return pending_memtable_writes_.load() == 0; });
} }
} else {
// (Writes are finished before the next write group starts.)
}
// Wait for any LockWAL to clear
while (lock_wal_count_ > 0) {
bg_cv_.Wait();
}
} }
// TaskType is used to identify tasks in thread-pool, currently only // TaskType is used to identify tasks in thread-pool, currently only

View File

@ -2156,6 +2156,8 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/,
// two_write_queues_ is true (This is to simplify the reasoning.) // two_write_queues_ is true (This is to simplify the reasoning.)
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(lock_wal_count_ == 0);
// TODO: plumb Env::IOActivity, Env::IOPriority // TODO: plumb Env::IOActivity, Env::IOPriority
const ReadOptions read_options; const ReadOptions read_options;
const WriteOptions write_options; const WriteOptions write_options;

View File

@ -666,11 +666,25 @@ TEST_P(DBWriteTest, LockWALInEffect) {
// try the 1st WAL created during open // try the 1st WAL created during open
ASSERT_OK(Put("key0", "value")); ASSERT_OK(Put("key0", "value"));
ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty()); ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());
ASSERT_OK(db_->LockWAL()); ASSERT_OK(db_->LockWAL());
ASSERT_TRUE(dbfull()->WALBufferIsEmpty()); ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
uint64_t wal_num = dbfull()->TEST_GetCurrentLogNumber();
// Manual flush with wait=false should abruptly fail with TryAgain
FlushOptions flush_opts;
flush_opts.wait = false;
for (bool allow_write_stall : {true, false}) {
flush_opts.allow_write_stall = allow_write_stall;
ASSERT_TRUE(db_->Flush(flush_opts).IsTryAgain());
}
ASSERT_EQ(wal_num, dbfull()->TEST_GetCurrentLogNumber());
ASSERT_OK(db_->UnlockWAL()); ASSERT_OK(db_->UnlockWAL());
// try the 2nd wal created during SwitchWAL
// try the 2nd wal created during SwitchWAL (not locked this time)
ASSERT_OK(dbfull()->TEST_SwitchWAL()); ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_NE(wal_num, dbfull()->TEST_GetCurrentLogNumber());
ASSERT_OK(Put("key1", "value")); ASSERT_OK(Put("key1", "value"));
ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty()); ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());
ASSERT_OK(db_->LockWAL()); ASSERT_OK(db_->LockWAL());
@ -709,21 +723,57 @@ TEST_P(DBWriteTest, LockWALInEffect) {
} }
TEST_P(DBWriteTest, LockWALConcurrentRecursive) { TEST_P(DBWriteTest, LockWALConcurrentRecursive) {
// This is a micro-stress test of LockWAL and concurrency handling.
// It is considered the most convenient way to balance functional
// coverage and reproducibility (vs. the two extremes of (a) unit tests
// tailored to specific interleavings and (b) db_stress)
Options options = GetOptions(); Options options = GetOptions();
Reopen(options); Reopen(options);
ASSERT_OK(Put("k1", "val")); ASSERT_OK(Put("k1", "k1_orig"));
ASSERT_OK(db_->LockWAL()); // 0 -> 1 ASSERT_OK(db_->LockWAL()); // 0 -> 1
auto frozen_seqno = db_->GetLatestSequenceNumber(); auto frozen_seqno = db_->GetLatestSequenceNumber();
std::atomic<bool> t1_completed{false};
port::Thread t1{[&]() { std::string ingest_file = dbname_ + "/external.sst";
// Won't finish until WAL unlocked {
ASSERT_OK(Put("k1", "val2")); SstFileWriter sst_file_writer(EnvOptions(), options);
t1_completed = true; ASSERT_OK(sst_file_writer.Open(ingest_file));
ASSERT_OK(sst_file_writer.Put("k2", "k2_val"));
ExternalSstFileInfo external_info;
ASSERT_OK(sst_file_writer.Finish(&external_info));
}
AcqRelAtomic<bool> parallel_ingest_completed{false};
port::Thread parallel_ingest{[&]() {
IngestExternalFileOptions ingest_opts;
ingest_opts.move_files = true; // faster than copy
// Shouldn't finish until WAL unlocked
ASSERT_OK(db_->IngestExternalFile({ingest_file}, ingest_opts));
parallel_ingest_completed.Store(true);
}};
AcqRelAtomic<bool> flush_completed{false};
port::Thread parallel_flush{[&]() {
FlushOptions flush_opts;
// NB: Flush with wait=false case is tested above in LockWALInEffect
flush_opts.wait = true;
// allow_write_stall = true blocks in fewer cases
flush_opts.allow_write_stall = true;
// Shouldn't finish until WAL unlocked
ASSERT_OK(db_->Flush(flush_opts));
flush_completed.Store(true);
}};
AcqRelAtomic<bool> parallel_put_completed{false};
port::Thread parallel_put{[&]() {
// This can make certain failure scenarios more likely:
// sleep(1);
// Shouldn't finish until WAL unlocked
ASSERT_OK(Put("k1", "k1_mod"));
parallel_put_completed.Store(true);
}}; }};
ASSERT_OK(db_->LockWAL()); // 1 -> 2 ASSERT_OK(db_->LockWAL()); // 1 -> 2
// Read-only ops are OK // Read-only ops are OK
ASSERT_EQ(Get("k1"), "val"); ASSERT_EQ(Get("k1"), "k1_orig");
{ {
std::vector<LiveFileStorageInfo> files; std::vector<LiveFileStorageInfo> files;
LiveFilesStorageInfoOptions lf_opts; LiveFilesStorageInfoOptions lf_opts;
@ -732,29 +782,35 @@ TEST_P(DBWriteTest, LockWALConcurrentRecursive) {
ASSERT_OK(db_->GetLiveFilesStorageInfo({lf_opts}, &files)); ASSERT_OK(db_->GetLiveFilesStorageInfo({lf_opts}, &files));
} }
port::Thread t2{[&]() { port::Thread parallel_lock_wal{[&]() {
ASSERT_OK(db_->LockWAL()); // 2 -> 3 or 1 -> 2 ASSERT_OK(db_->LockWAL()); // 2 -> 3 or 1 -> 2
}}; }};
ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 or 3 -> 2 ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 or 3 -> 2
// Give t1 an extra chance to jump in case of bug // Give parallel_put an extra chance to jump in case of bug
std::this_thread::yield(); std::this_thread::yield();
t2.join(); parallel_lock_wal.join();
ASSERT_FALSE(t1_completed.load()); ASSERT_FALSE(parallel_put_completed.Load());
ASSERT_FALSE(parallel_ingest_completed.Load());
ASSERT_FALSE(flush_completed.Load());
// Should now have 2 outstanding LockWAL // Should now have 2 outstanding LockWAL
ASSERT_EQ(Get("k1"), "val"); ASSERT_EQ(Get("k1"), "k1_orig");
ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 ASSERT_OK(db_->UnlockWAL()); // 2 -> 1
ASSERT_FALSE(t1_completed.load()); ASSERT_FALSE(parallel_put_completed.Load());
ASSERT_EQ(Get("k1"), "val"); ASSERT_FALSE(parallel_ingest_completed.Load());
ASSERT_FALSE(flush_completed.Load());
ASSERT_EQ(Get("k1"), "k1_orig");
ASSERT_EQ(Get("k2"), "NOT_FOUND");
ASSERT_EQ(frozen_seqno, db_->GetLatestSequenceNumber()); ASSERT_EQ(frozen_seqno, db_->GetLatestSequenceNumber());
// Ensure final Unlock is concurrency safe and extra Unlock is safe but // Ensure final Unlock is concurrency safe and extra Unlock is safe but
// non-OK // non-OK
std::atomic<int> unlock_ok{0}; std::atomic<int> unlock_ok{0};
port::Thread t3{[&]() { port::Thread parallel_stuff{[&]() {
if (db_->UnlockWAL().ok()) { if (db_->UnlockWAL().ok()) {
unlock_ok++; unlock_ok++;
} }
@ -767,18 +823,23 @@ TEST_P(DBWriteTest, LockWALConcurrentRecursive) {
if (db_->UnlockWAL().ok()) { if (db_->UnlockWAL().ok()) {
unlock_ok++; unlock_ok++;
} }
t3.join(); parallel_stuff.join();
// There was one extra unlock, so just one non-ok // There was one extra unlock, so just one non-ok
ASSERT_EQ(unlock_ok.load(), 2); ASSERT_EQ(unlock_ok.load(), 2);
// Write can proceed // Write can proceed
t1.join(); parallel_put.join();
ASSERT_TRUE(t1_completed.load()); ASSERT_TRUE(parallel_put_completed.Load());
ASSERT_EQ(Get("k1"), "val2"); ASSERT_EQ(Get("k1"), "k1_mod");
parallel_ingest.join();
ASSERT_TRUE(parallel_ingest_completed.Load());
ASSERT_EQ(Get("k2"), "k2_val");
parallel_flush.join();
ASSERT_TRUE(flush_completed.Load());
// And new writes // And new writes
ASSERT_OK(Put("k2", "val")); ASSERT_OK(Put("k3", "val"));
ASSERT_EQ(Get("k2"), "val"); ASSERT_EQ(Get("k3"), "val");
} }
TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {

View File

@ -674,10 +674,6 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
ASSERT_EQ(file3_info.smallest_key, Key(110)); ASSERT_EQ(file3_info.smallest_key, Key(110));
ASSERT_EQ(file3_info.largest_key, Key(124)); ASSERT_EQ(file3_info.largest_key, Key(124));
ASSERT_OK(dbfull()->LockWAL());
// TODO(FIXME): should not allow file ingestion.
// With below line, ingestion will block, without it, ingestion can go
// through. ASSERT_OK(dbfull()->Put(WriteOptions(), "vo", "vo"));
s = DeprecatedAddFile({file1}, true /* move file */); s = DeprecatedAddFile({file1}, true /* move file */);
ASSERT_OK(s) << s.ToString(); ASSERT_OK(s) << s.ToString();
ASSERT_EQ(Status::NotFound(), env_->FileExists(file1)); ASSERT_EQ(Status::NotFound(), env_->FileExists(file1));

View File

@ -958,12 +958,48 @@ void StressTest::OperateDb(ThreadState* thread) {
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "LockWAL() failed: %s\n", s.ToString().c_str()); fprintf(stderr, "LockWAL() failed: %s\n", s.ToString().c_str());
} else { } else {
// Verify no writes during LockWAL
auto old_seqno = db_->GetLatestSequenceNumber(); auto old_seqno = db_->GetLatestSequenceNumber();
// And also that WAL is not changed during LockWAL()
std::unique_ptr<LogFile> old_wal;
s = db_->GetCurrentWalFile(&old_wal);
if (!s.ok()) {
fprintf(stderr, "GetCurrentWalFile() failed: %s\n",
s.ToString().c_str());
} else {
// Yield for a while // Yield for a while
do { do {
std::this_thread::yield(); std::this_thread::yield();
} while (thread->rand.OneIn(2)); } while (thread->rand.OneIn(2));
// Latest seqno should not have changed // Current WAL and size should not have changed
std::unique_ptr<LogFile> new_wal;
s = db_->GetCurrentWalFile(&new_wal);
if (!s.ok()) {
fprintf(stderr, "GetCurrentWalFile() failed: %s\n",
s.ToString().c_str());
} else {
if (old_wal->LogNumber() != new_wal->LogNumber()) {
fprintf(stderr,
"Failed: WAL number changed during LockWAL(): %" PRIu64
" to %" PRIu64 "\n",
old_wal->LogNumber(), new_wal->LogNumber());
}
// FIXME: FaultInjectionTestFS does not report file sizes that
// reflect what has been flushed. Either that needs to be fixed
// or GetSortedWals/GetLiveWalFile need to stop relying on
// asking the FS for sizes.
if (!fault_fs_guard &&
old_wal->SizeFileBytes() != new_wal->SizeFileBytes()) {
fprintf(stderr,
"Failed: WAL %" PRIu64
" size changed during LockWAL(): %" PRIu64
" to %" PRIu64 "\n",
old_wal->LogNumber(), old_wal->SizeFileBytes(),
new_wal->SizeFileBytes());
}
}
}
// Verify no writes during LockWAL
auto new_seqno = db_->GetLatestSequenceNumber(); auto new_seqno = db_->GetLatestSequenceNumber();
if (old_seqno != new_seqno) { if (old_seqno != new_seqno) {
fprintf( fprintf(
@ -971,6 +1007,7 @@ void StressTest::OperateDb(ThreadState* thread) {
"Failure: latest seqno changed from %u to %u with WAL locked\n", "Failure: latest seqno changed from %u to %u with WAL locked\n",
(unsigned)old_seqno, (unsigned)new_seqno); (unsigned)old_seqno, (unsigned)new_seqno);
} }
// Verification done. Now unlock WAL
s = db_->UnlockWAL(); s = db_->UnlockWAL();
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "UnlockWAL() failed: %s\n", s.ToString().c_str()); fprintf(stderr, "UnlockWAL() failed: %s\n", s.ToString().c_str());
@ -2468,6 +2505,7 @@ void StressTest::TestPromoteL0(ThreadState* thread,
Status StressTest::TestFlush(const std::vector<int>& rand_column_families) { Status StressTest::TestFlush(const std::vector<int>& rand_column_families) {
FlushOptions flush_opts; FlushOptions flush_opts;
assert(flush_opts.wait);
if (FLAGS_atomic_flush) { if (FLAGS_atomic_flush) {
return db_->Flush(flush_opts, column_families_); return db_->Flush(flush_opts, column_families_);
} }

View File

@ -1679,8 +1679,8 @@ class DB {
// Freezes the logical state of the DB (by stopping writes), and if WAL is // Freezes the logical state of the DB (by stopping writes), and if WAL is
// enabled, ensures that state has been flushed to DB files (as in // enabled, ensures that state has been flushed to DB files (as in
// FlushWAL()). This can be used for taking a Checkpoint at a known DB // FlushWAL()). This can be used for taking a Checkpoint at a known DB
// state, though the user must use options to insure no DB flush is invoked // state, though while the WAL is locked, flushes as part of CreateCheckpoint
// in this frozen state. Other operations allowed on a "read only" DB should // and simiar are skipped. Other operations allowed on a "read only" DB should
// work while frozen. Each LockWAL() call that returns OK must eventually be // work while frozen. Each LockWAL() call that returns OK must eventually be
// followed by a corresponding call to UnlockWAL(). Where supported, non-OK // followed by a corresponding call to UnlockWAL(). Where supported, non-OK
// status is generally only possible with some kind of corruption or I/O // status is generally only possible with some kind of corruption or I/O

View File

@ -883,12 +883,6 @@ def finalize_and_sanitize(src_params):
elif (dest_params.get("use_put_entity_one_in") > 1 and elif (dest_params.get("use_put_entity_one_in") > 1 and
dest_params.get("use_timed_put_one_in") == 1): dest_params.get("use_timed_put_one_in") == 1):
dest_params["use_timed_put_one_in"] = 3 dest_params["use_timed_put_one_in"] = 3
# TODO: re-enable this combination.
if dest_params.get("lock_wal_one_in") != 0 and dest_params["ingest_external_file_one_in"] != 0:
if random.choice([0, 1]) == 0:
dest_params["ingest_external_file_one_in"] = 0
else:
dest_params["lock_wal_one_in"] = 0
return dest_params return dest_params
def gen_cmd_params(args): def gen_cmd_params(args):

View File

@ -0,0 +1 @@
* While WAL is locked with LockWAL(), some operations like Flush() and IngestExternalFile() are now blocked as they should have been.