Refactor SyncWAL and SyncClosedLogs for code sharing (#12707)

Summary:
These functions were very similar and did not make sense for maintaining separately. This is not a pure refactor but I think bringing the behaviors closer together should reduce long term risk of unintentionally divergent behavior. This change is motivated by some forthcoming WAL handling fixes for Checkpoint and Backups.

* Sync() is always used on closed WALs, like the old SyncClosedWals. SyncWithoutFlush() is only used on the active (maybe) WAL. Perhaps SyncWithoutFlush() should be used whenever available, but I don't know which is preferred, as the previous state of the code was inconsistent.
* Syncing the WAL dir is selective based on need, like old SyncWAL, rather than done always like old SyncClosedLogs. This could be a performance improvement that was never applied to SyncClosedLogs but now is. We might still sync the dir more times than necessary in the case of parallel SyncWAL variants, but on a good FileSystem that's probably not too different performance-wise from us implementing something to have threads wait on each other.

Cosmetic changes:

* Rename internal function SyncClosedLogs to SyncClosedWals
* Merging the sync points into the common implementation between the two entry points isn't pretty, but should be fine.

Recommended follow-up:

* Clean up more confusing naming like log_dir_synced_

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12707

Test Plan: existing tests

Reviewed By: anand1976

Differential Revision: D57870856

Pulled By: pdillinger

fbshipit-source-id: 5455fba016d25dd5664fa41b253f18db2ca8919a
This commit is contained in:
Peter Dillinger 2024-05-30 14:53:13 -07:00 committed by Facebook GitHub Bot
parent 01179678b2
commit 7127119ae9
5 changed files with 120 additions and 146 deletions

View File

@ -86,8 +86,8 @@ TEST_F(DBFlushTest, SyncFail) {
options.env = fault_injection_env.get();
SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
{"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedWals:Start"},
{"DBImpl::SyncClosedWals:Failed", "DBFlushTest::SyncFail:2"}});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
@ -111,8 +111,8 @@ TEST_F(DBFlushTest, SyncSkip) {
Options options = CurrentOptions();
SyncPoint::GetInstance()->LoadDependency(
{{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"},
{"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}});
{{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedWals:Skip"},
{"DBImpl::SyncClosedWals:Skip", "DBFlushTest::SyncSkip:2"}});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
@ -2381,7 +2381,7 @@ TEST_F(DBFlushTest, PickRightMemtables) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncClosedLogs:BeforeReLock", [&](void* /*arg*/) {
"DBImpl::SyncClosedWals:BeforeReLock", [&](void* /*arg*/) {
ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v"));
auto* cfhi =
static_cast_with_check<ColumnFamilyHandleImpl>(handles_[1]);

View File

@ -1564,62 +1564,104 @@ bool DBImpl::WALBufferIsEmpty() {
Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBImpl::SyncWAL:Begin");
autovector<log::Writer*, 1> logs_to_sync;
bool need_log_dir_sync;
uint64_t current_log_number;
WriteOptions write_options;
VersionEdit synced_wals;
Status s = SyncWalImpl(/*include_current_wal=*/true, write_options,
/*job_context=*/nullptr, &synced_wals,
/*error_recovery_in_prog=*/false);
if (s.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
// TODO: plumb Env::IOActivity, Env::IOPriority
const ReadOptions read_options;
s = ApplyWALToManifest(read_options, write_options, &synced_wals);
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
return s;
}
IOStatus DBImpl::SyncWalImpl(bool include_current_wal,
const WriteOptions& write_options,
JobContext* job_context, VersionEdit* synced_wals,
bool error_recovery_in_prog) {
autovector<log::Writer*, 1> wals_to_sync;
bool need_wal_dir_sync;
// Number of a WAL that was active at the start of call and maybe is by
// the end of the call.
uint64_t maybe_active_number;
// Sync WALs up to this number
uint64_t up_to_number;
{
InstrumentedMutexLock l(&log_write_mutex_);
assert(!logs_.empty());
// This SyncWAL() call only cares about logs up to this number.
current_log_number = logfile_number_;
maybe_active_number = logfile_number_;
up_to_number =
include_current_wal ? maybe_active_number : maybe_active_number - 1;
while (logs_.front().number <= current_log_number &&
logs_.front().IsSyncing()) {
while (logs_.front().number <= up_to_number && logs_.front().IsSyncing()) {
log_sync_cv_.Wait();
}
// First check that logs are safe to sync in background.
for (auto it = logs_.begin();
it != logs_.end() && it->number <= current_log_number; ++it) {
if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
return Status::NotSupported(
"SyncWAL() is not supported for this implementation of WAL file",
immutable_db_options_.allow_mmap_writes
? "try setting Options::allow_mmap_writes to false"
: Slice());
}
if (include_current_wal &&
!logs_.back().writer->file()->writable_file()->IsSyncThreadSafe()) {
return IOStatus::NotSupported(
"SyncWAL() is not supported for this implementation of WAL file",
immutable_db_options_.allow_mmap_writes
? "try setting Options::allow_mmap_writes to false"
: Slice());
}
for (auto it = logs_.begin();
it != logs_.end() && it->number <= current_log_number; ++it) {
it != logs_.end() && it->number <= up_to_number; ++it) {
auto& log = *it;
log.PrepareForSync();
logs_to_sync.push_back(log.writer);
wals_to_sync.push_back(log.writer);
}
need_log_dir_sync = !log_dir_synced_;
need_wal_dir_sync = !log_dir_synced_;
}
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
RecordTick(stats_, WAL_FILE_SYNCED);
Status status;
IOStatus io_s;
// TODO: plumb Env::IOActivity, Env::IOPriority
const ReadOptions read_options;
const WriteOptions write_options;
IOOptions opts;
io_s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (!io_s.ok()) {
status = io_s;
if (include_current_wal) {
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
}
RecordTick(stats_, WAL_FILE_SYNCED);
IOOptions opts;
IOStatus io_s = WritableFileWriter::PrepareIOOptions(write_options, opts);
if (io_s.ok()) {
for (log::Writer* log : logs_to_sync) {
io_s =
log->file()->SyncWithoutFlush(opts, immutable_db_options_.use_fsync);
for (log::Writer* log : wals_to_sync) {
if (job_context) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number());
}
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
if (log->get_log_number() >= maybe_active_number) {
assert(log->get_log_number() == maybe_active_number);
io_s = log->file()->SyncWithoutFlush(opts,
immutable_db_options_.use_fsync);
} else {
io_s = log->file()->Sync(opts, immutable_db_options_.use_fsync);
}
if (!io_s.ok()) {
status = io_s;
break;
}
// Normally the log file is closed when purging obsolete file, but if
// log recycling is enabled, the log file is closed here so that it
// can be reused.
if (log->get_log_number() < maybe_active_number &&
immutable_db_options_.recycle_log_file_num > 0) {
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
io_s = log->Close(write_options);
if (!io_s.ok()) {
break;
}
}
}
}
if (!io_s.ok()) {
@ -1629,31 +1671,28 @@ Status DBImpl::SyncWAL() {
// future writes
IOStatusCheck(io_s);
}
if (status.ok() && need_log_dir_sync) {
status = directories_.GetWalDir()->FsyncWithDirOptions(
if (io_s.ok() && need_wal_dir_sync) {
io_s = directories_.GetWalDir()->FsyncWithDirOptions(
IOOptions(), nullptr,
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
if (include_current_wal) {
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
VersionEdit synced_wals;
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
} else {
TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedWals:BeforeReLock",
/*arg=*/nullptr);
}
{
InstrumentedMutexLock l(&log_write_mutex_);
if (status.ok()) {
MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals);
if (io_s.ok()) {
MarkLogsSynced(up_to_number, need_wal_dir_sync, synced_wals);
} else {
MarkLogsNotSynced(current_log_number);
MarkLogsNotSynced(up_to_number);
}
}
if (status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
status = ApplyWALToManifest(read_options, write_options, &synced_wals);
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
return status;
return io_s;
}
Status DBImpl::ApplyWALToManifest(const ReadOptions& read_options,

View File

@ -1920,7 +1920,7 @@ class DBImpl : public DB {
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);
IOStatus SyncClosedLogs(const WriteOptions& write_options,
IOStatus SyncClosedWals(const WriteOptions& write_options,
JobContext* job_context, VersionEdit* synced_wals,
bool error_recovery_in_prog);
@ -2255,6 +2255,11 @@ class DBImpl : public DB {
ColumnFamilyData* PickCompactionFromQueue(
std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);
IOStatus SyncWalImpl(bool include_current_wal,
const WriteOptions& write_options,
JobContext* job_context, VersionEdit* synced_wals,
bool error_recovery_in_prog);
// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit);
Status ApplyWALToManifest(const ReadOptions& read_options,
@ -2579,7 +2584,7 @@ class DBImpl : public DB {
// 8. read by MarkLogsNotSynced() and MarkLogsSynced() are protected by
// log_write_mutex_.
// 9. erase() by MarkLogsSynced() protected by log_write_mutex_.
// 10. read by SyncClosedLogs() protected by only log_write_mutex_. This can
// 10. read by SyncClosedWals() protected by only log_write_mutex_. This can
// happen in bg flush threads after DB::Open() returns success to
// applications.
// 11. reads, e.g. front(), iteration, and back() called by PreprocessWrite()
@ -2592,7 +2597,7 @@ class DBImpl : public DB {
// 13. emplace_back() by SwitchMemtable() hold both mutex_ and
// log_write_mutex_. This happens in the write group leader. Can conflict
// with bg threads calling FindObsoleteFiles(), MarkLogsSynced(),
// SyncClosedLogs(), etc. as well as application threads calling
// SyncClosedWals(), etc. as well as application threads calling
// FlushWAL(), SyncWAL(), LockWAL(). This is fine because all parties
// require at least log_write_mutex_.
// 14. iteration called in WriteToWAL(write_group) protected by

View File

@ -116,89 +116,19 @@ bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT(
return true;
}
IOStatus DBImpl::SyncClosedLogs(const WriteOptions& write_options,
IOStatus DBImpl::SyncClosedWals(const WriteOptions& write_options,
JobContext* job_context,
VersionEdit* synced_wals,
bool error_recovery_in_prog) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
InstrumentedMutexLock l(&log_write_mutex_);
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
while (logs_.front().number < current_log_number &&
logs_.front().IsSyncing()) {
log_sync_cv_.Wait();
TEST_SYNC_POINT("DBImpl::SyncClosedWals:Start");
IOStatus io_s = SyncWalImpl(/*include_current_wal*/ false, write_options,
job_context, synced_wals, error_recovery_in_prog);
if (!io_s.ok()) {
TEST_SYNC_POINT("DBImpl::SyncClosedWals:Failed");
} else {
TEST_SYNC_POINT("DBImpl::SyncClosedWals:end");
}
for (auto it = logs_.begin();
it != logs_.end() && it->number < current_log_number; ++it) {
auto& log = *it;
log.PrepareForSync();
logs_to_sync.push_back(log.writer);
}
IOStatus io_s;
if (!logs_to_sync.empty()) {
log_write_mutex_.Unlock();
assert(job_context);
for (log::Writer* log : logs_to_sync) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number());
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
IOOptions io_options;
io_s = WritableFileWriter::PrepareIOOptions(write_options, io_options);
if (!io_s.ok()) {
break;
}
io_s = log->file()->Sync(io_options, immutable_db_options_.use_fsync);
if (!io_s.ok()) {
break;
}
if (immutable_db_options_.recycle_log_file_num > 0) {
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
// Normally the log file is closed when purging obsolete file, but if
// log recycling is enabled, the log file is closed here so that it
// can be reused.
io_s = log->Close(write_options);
if (!io_s.ok()) {
break;
}
}
}
if (io_s.ok()) {
IOOptions io_options;
io_s = WritableFileWriter::PrepareIOOptions(write_options, io_options);
if (io_s.ok()) {
io_s = directories_.GetWalDir()->FsyncWithDirOptions(
io_options, nullptr,
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
}
TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock",
/*arg=*/nullptr);
log_write_mutex_.Lock();
// "number <= current_log_number - 1" is equivalent to
// "number < current_log_number".
if (io_s.ok()) {
MarkLogsSynced(current_log_number - 1, true, synced_wals);
} else {
MarkLogsNotSynced(current_log_number - 1);
}
if (!io_s.ok()) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return io_s;
}
}
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:end");
return io_s;
}
@ -237,12 +167,12 @@ Status DBImpl::FlushMemTableToOutputFile(
// If needs_to_sync_closed_wals is true, we need to record the current
// maximum memtable ID of this column family so that a later PickMemtables()
// call will not pick memtables whose IDs are higher. This is due to the fact
// that SyncClosedLogs() may release the db mutex, and memtable switch can
// that SyncClosedWals() may release the db mutex, and memtable switch can
// happen for this column family in the meantime. The newly created memtables
// have their data backed by unsynced WALs, thus they cannot be included in
// this flush job.
// Another reason why we must record the current maximum memtable ID of this
// column family: SyncClosedLogs() may release db mutex, thus it's possible
// column family: SyncClosedWals() may release db mutex, thus it's possible
// for application to continue to insert into memtables increasing db's
// sequence number. The application may take a snapshot, but this snapshot is
// not included in `snapshot_seqs` which will be passed to flush job because
@ -256,7 +186,7 @@ Status DBImpl::FlushMemTableToOutputFile(
// If needs_to_sync_closed_wals is false, then the flush job will pick ALL
// existing memtables of the column family when PickMemTable() is called
// later. Although we won't call SyncClosedLogs() in this case, we may still
// later. Although we won't call SyncClosedWals() in this case, we may still
// call the callbacks of the listeners, i.e. NotifyOnFlushBegin() which also
// releases and re-acquires the db mutex. In the meantime, the application
// can still insert into the memtables and increase the db's sequence number.
@ -286,12 +216,12 @@ Status DBImpl::FlushMemTableToOutputFile(
bool need_cancel = false;
IOStatus log_io_s = IOStatus::OK();
if (needs_to_sync_closed_wals) {
// SyncClosedLogs() may unlock and re-lock the log_write_mutex multiple
// SyncClosedWals() may unlock and re-lock the log_write_mutex multiple
// times.
VersionEdit synced_wals;
bool error_recovery_in_prog = error_handler_.IsRecoveryInProgress();
mutex_.Unlock();
log_io_s = SyncClosedLogs(write_options, job_context, &synced_wals,
log_io_s = SyncClosedWals(write_options, job_context, &synced_wals,
error_recovery_in_prog);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
@ -306,7 +236,7 @@ Status DBImpl::FlushMemTableToOutputFile(
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
}
} else {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
TEST_SYNC_POINT("DBImpl::SyncClosedWals:Skip");
}
s = log_io_s;
@ -580,7 +510,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
VersionEdit synced_wals;
bool error_recovery_in_prog = error_handler_.IsRecoveryInProgress();
mutex_.Unlock();
log_io_s = SyncClosedLogs(write_options, job_context, &synced_wals,
log_io_s = SyncClosedWals(write_options, job_context, &synced_wals,
error_recovery_in_prog);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {

View File

@ -425,7 +425,7 @@ TEST_F(DBErrorHandlingFSTest, FlushWALWriteRetryableError) {
listener->EnableAutoRecovery(false);
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncClosedLogs:Start",
"DBImpl::SyncClosedWals:Start",
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
SyncPoint::GetInstance()->EnableProcessing();
@ -470,7 +470,7 @@ TEST_F(DBErrorHandlingFSTest, FlushWALAtomicWriteRetryableError) {
listener->EnableAutoRecovery(false);
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::SyncClosedLogs:Start",
"DBImpl::SyncClosedWals:Start",
[&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); });
SyncPoint::GetInstance()->EnableProcessing();