diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 6846d96d0e..b72de9a688 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -86,8 +86,8 @@ TEST_F(DBFlushTest, SyncFail) { options.env = fault_injection_env.get(); SyncPoint::GetInstance()->LoadDependency( - {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"}, - {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}}); + {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedWals:Start"}, + {"DBImpl::SyncClosedWals:Failed", "DBFlushTest::SyncFail:2"}}); SyncPoint::GetInstance()->EnableProcessing(); CreateAndReopenWithCF({"pikachu"}, options); @@ -111,8 +111,8 @@ TEST_F(DBFlushTest, SyncSkip) { Options options = CurrentOptions(); SyncPoint::GetInstance()->LoadDependency( - {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedLogs:Skip"}, - {"DBImpl::SyncClosedLogs:Skip", "DBFlushTest::SyncSkip:2"}}); + {{"DBFlushTest::SyncSkip:1", "DBImpl::SyncClosedWals:Skip"}, + {"DBImpl::SyncClosedWals:Skip", "DBFlushTest::SyncSkip:2"}}); SyncPoint::GetInstance()->EnableProcessing(); Reopen(options); @@ -2381,7 +2381,7 @@ TEST_F(DBFlushTest, PickRightMemtables) { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->SetCallBack( - "DBImpl::SyncClosedLogs:BeforeReLock", [&](void* /*arg*/) { + "DBImpl::SyncClosedWals:BeforeReLock", [&](void* /*arg*/) { ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v")); auto* cfhi = static_cast_with_check(handles_[1]); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 5cf528051b..d338a58422 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1564,62 +1564,104 @@ bool DBImpl::WALBufferIsEmpty() { Status DBImpl::SyncWAL() { TEST_SYNC_POINT("DBImpl::SyncWAL:Begin"); - autovector logs_to_sync; - bool need_log_dir_sync; - uint64_t current_log_number; + WriteOptions write_options; + VersionEdit synced_wals; + Status s = SyncWalImpl(/*include_current_wal=*/true, write_options, + /*job_context=*/nullptr, &synced_wals, + /*error_recovery_in_prog=*/false); + + if (s.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + // TODO: plumb Env::IOActivity, Env::IOPriority + const ReadOptions read_options; + s = ApplyWALToManifest(read_options, write_options, &synced_wals); + } + + TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); + return s; +} + +IOStatus DBImpl::SyncWalImpl(bool include_current_wal, + const WriteOptions& write_options, + JobContext* job_context, VersionEdit* synced_wals, + bool error_recovery_in_prog) { + autovector wals_to_sync; + bool need_wal_dir_sync; + // Number of a WAL that was active at the start of call and maybe is by + // the end of the call. + uint64_t maybe_active_number; + // Sync WALs up to this number + uint64_t up_to_number; { InstrumentedMutexLock l(&log_write_mutex_); assert(!logs_.empty()); - // This SyncWAL() call only cares about logs up to this number. - current_log_number = logfile_number_; + maybe_active_number = logfile_number_; + up_to_number = + include_current_wal ? maybe_active_number : maybe_active_number - 1; - while (logs_.front().number <= current_log_number && - logs_.front().IsSyncing()) { + while (logs_.front().number <= up_to_number && logs_.front().IsSyncing()) { log_sync_cv_.Wait(); } // First check that logs are safe to sync in background. - for (auto it = logs_.begin(); - it != logs_.end() && it->number <= current_log_number; ++it) { - if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) { - return Status::NotSupported( - "SyncWAL() is not supported for this implementation of WAL file", - immutable_db_options_.allow_mmap_writes - ? "try setting Options::allow_mmap_writes to false" - : Slice()); - } + if (include_current_wal && + !logs_.back().writer->file()->writable_file()->IsSyncThreadSafe()) { + return IOStatus::NotSupported( + "SyncWAL() is not supported for this implementation of WAL file", + immutable_db_options_.allow_mmap_writes + ? "try setting Options::allow_mmap_writes to false" + : Slice()); } for (auto it = logs_.begin(); - it != logs_.end() && it->number <= current_log_number; ++it) { + it != logs_.end() && it->number <= up_to_number; ++it) { auto& log = *it; log.PrepareForSync(); - logs_to_sync.push_back(log.writer); + wals_to_sync.push_back(log.writer); } - need_log_dir_sync = !log_dir_synced_; + need_wal_dir_sync = !log_dir_synced_; } - TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); - RecordTick(stats_, WAL_FILE_SYNCED); - Status status; - IOStatus io_s; - // TODO: plumb Env::IOActivity, Env::IOPriority - const ReadOptions read_options; - const WriteOptions write_options; - IOOptions opts; - io_s = WritableFileWriter::PrepareIOOptions(write_options, opts); - if (!io_s.ok()) { - status = io_s; + if (include_current_wal) { + TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); } + RecordTick(stats_, WAL_FILE_SYNCED); + IOOptions opts; + IOStatus io_s = WritableFileWriter::PrepareIOOptions(write_options, opts); if (io_s.ok()) { - for (log::Writer* log : logs_to_sync) { - io_s = - log->file()->SyncWithoutFlush(opts, immutable_db_options_.use_fsync); + for (log::Writer* log : wals_to_sync) { + if (job_context) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, + log->get_log_number()); + } + if (error_recovery_in_prog) { + log->file()->reset_seen_error(); + } + if (log->get_log_number() >= maybe_active_number) { + assert(log->get_log_number() == maybe_active_number); + io_s = log->file()->SyncWithoutFlush(opts, + immutable_db_options_.use_fsync); + } else { + io_s = log->file()->Sync(opts, immutable_db_options_.use_fsync); + } if (!io_s.ok()) { - status = io_s; break; } + // Normally the log file is closed when purging obsolete file, but if + // log recycling is enabled, the log file is closed here so that it + // can be reused. + if (log->get_log_number() < maybe_active_number && + immutable_db_options_.recycle_log_file_num > 0) { + if (error_recovery_in_prog) { + log->file()->reset_seen_error(); + } + io_s = log->Close(write_options); + if (!io_s.ok()) { + break; + } + } } } if (!io_s.ok()) { @@ -1629,31 +1671,28 @@ Status DBImpl::SyncWAL() { // future writes IOStatusCheck(io_s); } - if (status.ok() && need_log_dir_sync) { - status = directories_.GetWalDir()->FsyncWithDirOptions( + if (io_s.ok() && need_wal_dir_sync) { + io_s = directories_.GetWalDir()->FsyncWithDirOptions( IOOptions(), nullptr, DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); } - TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); + if (include_current_wal) { + TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); - TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); - VersionEdit synced_wals; + TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); + } else { + TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedWals:BeforeReLock", + /*arg=*/nullptr); + } { InstrumentedMutexLock l(&log_write_mutex_); - if (status.ok()) { - MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals); + if (io_s.ok()) { + MarkLogsSynced(up_to_number, need_wal_dir_sync, synced_wals); } else { - MarkLogsNotSynced(current_log_number); + MarkLogsNotSynced(up_to_number); } } - if (status.ok() && synced_wals.IsWalAddition()) { - InstrumentedMutexLock l(&mutex_); - status = ApplyWALToManifest(read_options, write_options, &synced_wals); - } - - TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); - - return status; + return io_s; } Status DBImpl::ApplyWALToManifest(const ReadOptions& read_options, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index ccee4a9b65..a7fd411d46 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1920,7 +1920,7 @@ class DBImpl : public DB { void ReleaseFileNumberFromPendingOutputs( std::unique_ptr::iterator>& v); - IOStatus SyncClosedLogs(const WriteOptions& write_options, + IOStatus SyncClosedWals(const WriteOptions& write_options, JobContext* job_context, VersionEdit* synced_wals, bool error_recovery_in_prog); @@ -2255,6 +2255,11 @@ class DBImpl : public DB { ColumnFamilyData* PickCompactionFromQueue( std::unique_ptr* token, LogBuffer* log_buffer); + IOStatus SyncWalImpl(bool include_current_wal, + const WriteOptions& write_options, + JobContext* job_context, VersionEdit* synced_wals, + bool error_recovery_in_prog); + // helper function to call after some of the logs_ were synced void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit); Status ApplyWALToManifest(const ReadOptions& read_options, @@ -2579,7 +2584,7 @@ class DBImpl : public DB { // 8. read by MarkLogsNotSynced() and MarkLogsSynced() are protected by // log_write_mutex_. // 9. erase() by MarkLogsSynced() protected by log_write_mutex_. - // 10. read by SyncClosedLogs() protected by only log_write_mutex_. This can + // 10. read by SyncClosedWals() protected by only log_write_mutex_. This can // happen in bg flush threads after DB::Open() returns success to // applications. // 11. reads, e.g. front(), iteration, and back() called by PreprocessWrite() @@ -2592,7 +2597,7 @@ class DBImpl : public DB { // 13. emplace_back() by SwitchMemtable() hold both mutex_ and // log_write_mutex_. This happens in the write group leader. Can conflict // with bg threads calling FindObsoleteFiles(), MarkLogsSynced(), - // SyncClosedLogs(), etc. as well as application threads calling + // SyncClosedWals(), etc. as well as application threads calling // FlushWAL(), SyncWAL(), LockWAL(). This is fine because all parties // require at least log_write_mutex_. // 14. iteration called in WriteToWAL(write_group) protected by diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index d7257ac70d..3eef4c5fdf 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -116,89 +116,19 @@ bool DBImpl::ShouldRescheduleFlushRequestToRetainUDT( return true; } -IOStatus DBImpl::SyncClosedLogs(const WriteOptions& write_options, +IOStatus DBImpl::SyncClosedWals(const WriteOptions& write_options, JobContext* job_context, VersionEdit* synced_wals, bool error_recovery_in_prog) { - TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); - InstrumentedMutexLock l(&log_write_mutex_); - autovector logs_to_sync; - uint64_t current_log_number = logfile_number_; - while (logs_.front().number < current_log_number && - logs_.front().IsSyncing()) { - log_sync_cv_.Wait(); + TEST_SYNC_POINT("DBImpl::SyncClosedWals:Start"); + + IOStatus io_s = SyncWalImpl(/*include_current_wal*/ false, write_options, + job_context, synced_wals, error_recovery_in_prog); + if (!io_s.ok()) { + TEST_SYNC_POINT("DBImpl::SyncClosedWals:Failed"); + } else { + TEST_SYNC_POINT("DBImpl::SyncClosedWals:end"); } - for (auto it = logs_.begin(); - it != logs_.end() && it->number < current_log_number; ++it) { - auto& log = *it; - log.PrepareForSync(); - logs_to_sync.push_back(log.writer); - } - - IOStatus io_s; - if (!logs_to_sync.empty()) { - log_write_mutex_.Unlock(); - - assert(job_context); - - for (log::Writer* log : logs_to_sync) { - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, - log->get_log_number()); - if (error_recovery_in_prog) { - log->file()->reset_seen_error(); - } - - IOOptions io_options; - io_s = WritableFileWriter::PrepareIOOptions(write_options, io_options); - if (!io_s.ok()) { - break; - } - io_s = log->file()->Sync(io_options, immutable_db_options_.use_fsync); - if (!io_s.ok()) { - break; - } - - if (immutable_db_options_.recycle_log_file_num > 0) { - if (error_recovery_in_prog) { - log->file()->reset_seen_error(); - } - // Normally the log file is closed when purging obsolete file, but if - // log recycling is enabled, the log file is closed here so that it - // can be reused. - io_s = log->Close(write_options); - if (!io_s.ok()) { - break; - } - } - } - if (io_s.ok()) { - IOOptions io_options; - io_s = WritableFileWriter::PrepareIOOptions(write_options, io_options); - if (io_s.ok()) { - io_s = directories_.GetWalDir()->FsyncWithDirOptions( - io_options, nullptr, - DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); - } - } - - TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock", - /*arg=*/nullptr); - log_write_mutex_.Lock(); - - // "number <= current_log_number - 1" is equivalent to - // "number < current_log_number". - if (io_s.ok()) { - MarkLogsSynced(current_log_number - 1, true, synced_wals); - } else { - MarkLogsNotSynced(current_log_number - 1); - } - if (!io_s.ok()) { - TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); - return io_s; - } - } - TEST_SYNC_POINT("DBImpl::SyncClosedLogs:end"); return io_s; } @@ -237,12 +167,12 @@ Status DBImpl::FlushMemTableToOutputFile( // If needs_to_sync_closed_wals is true, we need to record the current // maximum memtable ID of this column family so that a later PickMemtables() // call will not pick memtables whose IDs are higher. This is due to the fact - // that SyncClosedLogs() may release the db mutex, and memtable switch can + // that SyncClosedWals() may release the db mutex, and memtable switch can // happen for this column family in the meantime. The newly created memtables // have their data backed by unsynced WALs, thus they cannot be included in // this flush job. // Another reason why we must record the current maximum memtable ID of this - // column family: SyncClosedLogs() may release db mutex, thus it's possible + // column family: SyncClosedWals() may release db mutex, thus it's possible // for application to continue to insert into memtables increasing db's // sequence number. The application may take a snapshot, but this snapshot is // not included in `snapshot_seqs` which will be passed to flush job because @@ -256,7 +186,7 @@ Status DBImpl::FlushMemTableToOutputFile( // If needs_to_sync_closed_wals is false, then the flush job will pick ALL // existing memtables of the column family when PickMemTable() is called - // later. Although we won't call SyncClosedLogs() in this case, we may still + // later. Although we won't call SyncClosedWals() in this case, we may still // call the callbacks of the listeners, i.e. NotifyOnFlushBegin() which also // releases and re-acquires the db mutex. In the meantime, the application // can still insert into the memtables and increase the db's sequence number. @@ -286,12 +216,12 @@ Status DBImpl::FlushMemTableToOutputFile( bool need_cancel = false; IOStatus log_io_s = IOStatus::OK(); if (needs_to_sync_closed_wals) { - // SyncClosedLogs() may unlock and re-lock the log_write_mutex multiple + // SyncClosedWals() may unlock and re-lock the log_write_mutex multiple // times. VersionEdit synced_wals; bool error_recovery_in_prog = error_handler_.IsRecoveryInProgress(); mutex_.Unlock(); - log_io_s = SyncClosedLogs(write_options, job_context, &synced_wals, + log_io_s = SyncClosedWals(write_options, job_context, &synced_wals, error_recovery_in_prog); mutex_.Lock(); if (log_io_s.ok() && synced_wals.IsWalAddition()) { @@ -306,7 +236,7 @@ Status DBImpl::FlushMemTableToOutputFile( error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush); } } else { - TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip"); + TEST_SYNC_POINT("DBImpl::SyncClosedWals:Skip"); } s = log_io_s; @@ -580,7 +510,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( VersionEdit synced_wals; bool error_recovery_in_prog = error_handler_.IsRecoveryInProgress(); mutex_.Unlock(); - log_io_s = SyncClosedLogs(write_options, job_context, &synced_wals, + log_io_s = SyncClosedWals(write_options, job_context, &synced_wals, error_recovery_in_prog); mutex_.Lock(); if (log_io_s.ok() && synced_wals.IsWalAddition()) { diff --git a/db/error_handler_fs_test.cc b/db/error_handler_fs_test.cc index c6639bf197..a4eaad1531 100644 --- a/db/error_handler_fs_test.cc +++ b/db/error_handler_fs_test.cc @@ -425,7 +425,7 @@ TEST_F(DBErrorHandlingFSTest, FlushWALWriteRetryableError) { listener->EnableAutoRecovery(false); SyncPoint::GetInstance()->SetCallBack( - "DBImpl::SyncClosedLogs:Start", + "DBImpl::SyncClosedWals:Start", [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); }); SyncPoint::GetInstance()->EnableProcessing(); @@ -470,7 +470,7 @@ TEST_F(DBErrorHandlingFSTest, FlushWALAtomicWriteRetryableError) { listener->EnableAutoRecovery(false); SyncPoint::GetInstance()->SetCallBack( - "DBImpl::SyncClosedLogs:Start", + "DBImpl::SyncClosedWals:Start", [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); }); SyncPoint::GetInstance()->EnableProcessing();