Ensure Close() before LinkFile() for WALs in Checkpoint (#12734)

Summary:
POSIX semantics for LinkFile (hard links) allow linking a file
that is still being written two, with both the source and destination
showing any subsequent writes to the source. This may not be practical
semantics for some FileSystem implementations such as remote storage.
They might only link the flushed or sync-ed file contents at time of
LinkFile, or might even have undefined behavior if LinkFile is called on
a file still open for write (not yet "sealed"). This change builds on https://github.com/facebook/rocksdb/issues/12731
to bring more hygiene to our handling of WAL files in Checkpoint.

Specifically, we now Close WAL files as soon as they are either
(a) inactive and fully synced, or (b) inactive and obsolete (so maybe
never fully synced), rather than letting Close() happen in handling
obsolete files (maybe a background thread). This should not be a
performance issue as Close() should be trivial cost relative to other
IO ops, but just in case:
* We don't Close() while holding a mutex, to avoid blocking, and
* The old behavior is available with a new kill switch option
  `background_close_inactive_wals`.

Stacked on https://github.com/facebook/rocksdb/issues/12731

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12734

Test Plan:
Extended existing unit test, especially adding a hygiene
check to FaultInjectionTestFS to detect LinkFile() on a file still open
for writes. FaultInjectionTestFS already has relevant tracking data, and
tests can opt out of the new check, as in a smoke test I have left for
the old, deprecated functionality `background_close_inactive_wals=true`.

Also ran lengthy blackbox_crash_test to ensure the hygiene check is OK
with the crash test. (The only place I can find we use LinkFile in
production is Checkpoint.)

Reviewed By: cbi42

Differential Revision: D58295284

Pulled By: pdillinger

fbshipit-source-id: 64d90ed8477e2366c19eaf9c4c5ad60b82cac5c6
This commit is contained in:
Peter Dillinger 2024-06-12 11:48:45 -07:00 committed by Facebook GitHub Bot
parent d64eac28d3
commit 0646ec6e2d
15 changed files with 126 additions and 21 deletions

View File

@ -3179,6 +3179,8 @@ TEST_P(ColumnFamilyTest, IteratorCloseWALFile1) {
SpecialEnv env(Env::Default());
db_options_.env = &env;
db_options_.max_background_flushes = 1;
// When this option is removed, the test will need re-engineering
db_options_.background_close_inactive_wals = true;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(2));
Open();
@ -3231,6 +3233,8 @@ TEST_P(ColumnFamilyTest, IteratorCloseWALFile2) {
env.SetBackgroundThreads(2, Env::HIGH);
db_options_.env = &env;
db_options_.max_background_flushes = 1;
// When this option is removed, the test will need re-engineering
db_options_.background_close_inactive_wals = true;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(2));
Open();
@ -3288,6 +3292,8 @@ TEST_P(ColumnFamilyTest, ForwardIteratorCloseWALFile) {
env.SetBackgroundThreads(2, Env::HIGH);
db_options_.env = &env;
db_options_.max_background_flushes = 1;
// When this option is removed, the test will need re-engineering
db_options_.background_close_inactive_wals = true;
column_family_options_.memtable_factory.reset(
test::NewSpecialSkipListFactory(3));
column_family_options_.level0_file_num_compaction_trigger = 2;

View File

@ -401,7 +401,7 @@ Status DBImpl::GetLiveFilesStorageInfo(
// fully synced. Although the output DB of a Checkpoint or Backup needs
// to be fully synced on return, we don't strictly need to sync this
// DB (the input DB). If we allow Checkpoint to hard link an inactive
// WAL that isn't fully synced, that could result in an unsufficiently
// WAL that isn't fully synced, that could result in an insufficiently
// sync-ed Checkpoint. Here we get the set of WALs that are potentially
// unsynced or still being written to, to prevent them from being hard
// linked. Enforcing max_log_num from above ensures any new WALs after

View File

@ -1550,6 +1550,7 @@ bool DBImpl::WALBufferIsEmpty() {
}
Status DBImpl::GetOpenWalSizes(std::map<uint64_t, uint64_t>& number_to_size) {
assert(number_to_size.empty());
InstrumentedMutexLock l(&log_write_mutex_);
for (auto& log : logs_) {
auto* open_file = log.writer->file();
@ -1627,6 +1628,7 @@ IOStatus DBImpl::SyncWalImpl(bool include_current_wal,
RecordTick(stats_, WAL_FILE_SYNCED);
IOOptions opts;
IOStatus io_s = WritableFileWriter::PrepareIOOptions(write_options, opts);
std::list<log::Writer*> wals_internally_closed;
if (io_s.ok()) {
for (log::Writer* log : wals_to_sync) {
if (job_context) {
@ -1647,15 +1649,22 @@ IOStatus DBImpl::SyncWalImpl(bool include_current_wal,
if (!io_s.ok()) {
break;
}
// Normally the log file is closed when purging obsolete file, but if
// log recycling is enabled, the log file is closed here so that it
// can be reused.
// WALs can be closed when purging obsolete files, but if recycling is
// enabled, the log file is closed here so that it can be reused. And
// immediate closure here upon final sync makes it easier to guarantee
// that Checkpoint doesn't LinkFile on a WAL still open for write, which
// might be unsupported for some FileSystem implementations. Close here
// should be inexpensive because flush and sync are done, so the kill
// switch background_close_inactive_wals is expected to be removed in
// the future.
if (log->get_log_number() < maybe_active_number &&
immutable_db_options_.recycle_log_file_num > 0) {
(immutable_db_options_.recycle_log_file_num > 0 ||
!immutable_db_options_.background_close_inactive_wals)) {
if (error_recovery_in_prog) {
log->file()->reset_seen_error();
}
io_s = log->Close(write_options);
io_s = log->file()->Close(opts);
wals_internally_closed.push_back(log);
if (!io_s.ok()) {
break;
}
@ -1684,6 +1693,12 @@ IOStatus DBImpl::SyncWalImpl(bool include_current_wal,
}
{
InstrumentedMutexLock l(&log_write_mutex_);
for (auto* wal : wals_internally_closed) {
// We can only modify the state of log::Writer under the mutex
bool was_closed = wal->PublishIfClosed();
assert(was_closed);
(void)was_closed;
}
if (io_s.ok()) {
MarkLogsSynced(up_to_number, need_wal_dir_sync, synced_wals);
} else {
@ -1808,16 +1823,19 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
// Check if the file has been closed, i.e wal.writer->file() == nullptr
// which can happen if log recycling is enabled, or if all the data in
// the log has been synced
// Reclaim closed WALs (wal.writer->file() == nullptr), and if we don't
// need to close before that (background_close_inactive_wals) we can
// opportunistically reclaim WALs that happen to be fully synced.
// (Probably not worth extra code and mutex release to opportunistically
// close WALs that became eligible since last holding the mutex.
// FindObsoleteFiles can take care of it.)
if (wal.writer->file() == nullptr ||
wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) {
(immutable_db_options_.background_close_inactive_wals &&
wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize())) {
// Fully synced
logs_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it);
} else {
assert(wal.GetPreSyncSize() < wal.writer->file()->GetFlushedSize());
wal.FinishSync();
++it;
}

View File

@ -312,9 +312,26 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// logs_ could have changed while we were waiting.
continue;
}
// This WAL file is not live, so it's OK if we never sync the rest of it
// or if we close it *after* removing from `logs_`. If it's already
// closed, then it's been fully synced.
// This WAL file is not live, so it's OK if we never sync the rest of it.
// If it's already closed, then it's been fully synced. If
// !background_close_inactive_wals then we need to Close it before
// removing from logs_ but not blocking while holding log_write_mutex_.
if (!immutable_db_options_.background_close_inactive_wals &&
log.writer->file()) {
// We are taking ownership of and pinning the front entry, so we can
// expect it to be the same after releasing and re-acquiring the lock
log.PrepareForSync();
log_write_mutex_.Unlock();
// TODO: maybe check the return value of Close.
// TODO: plumb Env::IOActivity, Env::IOPriority
auto s = log.writer->file()->Close({});
s.PermitUncheckedError();
log_write_mutex_.Lock();
log.writer->PublishIfClosed();
assert(&log == &logs_.front());
log.FinishSync();
log_sync_cv_.SignalAll();
}
logs_to_free_.push_back(log.ReleaseWriter());
logs_.pop_front();
}
@ -506,7 +523,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
for (const auto w : state.logs_to_free) {
// TODO: maybe check the return value of Close.
// TODO: plumb Env::IOActivity, Env::IOPriority
auto s = w->Close(WriteOptions());
auto s = w->Close({});
s.PermitUncheckedError();
}

View File

@ -74,6 +74,15 @@ IOStatus Writer::Close(const WriteOptions& write_options) {
return s;
}
bool Writer::PublishIfClosed() {
if (dest_->IsClosed()) {
dest_.reset();
return true;
} else {
return false;
}
}
IOStatus Writer::AddRecord(const WriteOptions& write_options,
const Slice& slice) {
if (dest_->seen_error()) {

View File

@ -107,6 +107,11 @@ class Writer {
IOStatus Close(const WriteOptions& write_options);
// If closing the writer through file(), call this afterwards to modify
// this object's state to reflect that. Returns true if the destination file
// has been closed. If it hasn't been closed, returns false with no change.
bool PublishIfClosed();
bool BufferIsEmpty();
size_t TEST_block_offset() const { return block_offset_; }

View File

@ -278,7 +278,9 @@ class WritableFileWriter {
bool use_direct_io() { return writable_file_->use_direct_io(); }
bool BufferIsEmpty() { return buf_.CurrentSize() == 0; }
bool BufferIsEmpty() const { return buf_.CurrentSize() == 0; }
bool IsClosed() const { return writable_file_.get() == nullptr; }
void TEST_SetFileChecksumGenerator(
FileChecksumGenerator* checksum_generator) {

View File

@ -1347,6 +1347,15 @@ struct DBOptions {
// the WAL is read.
CompressionType wal_compression = kNoCompression;
// Set to true to re-instate an old behavior of keeping complete, synced WAL
// files open for write until they are collected for deletion by a
// background thread. This should not be needed unless there is a
// performance issue with file Close(), but setting it to true means that
// Checkpoint might call LinkFile on a WAL still open for write, which might
// be unsupported on some FileSystem implementations. As this is intended as
// a temporary kill switch, it is already DEPRECATED.
bool background_close_inactive_wals = false;
// If true, RocksDB supports flushing multiple column families and committing
// their results atomically to MANIFEST. Note that it is not
// necessary to set atomic_flush to true if WAL is always enabled since WAL

View File

@ -388,6 +388,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct ImmutableDBOptions, wal_compression),
OptionType::kCompressionType, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"background_close_inactive_wals",
{offsetof(struct ImmutableDBOptions, background_close_inactive_wals),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"seq_per_batch",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}},
@ -755,6 +759,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
two_write_queues(options.two_write_queues),
manual_wal_flush(options.manual_wal_flush),
wal_compression(options.wal_compression),
background_close_inactive_wals(options.background_close_inactive_wals),
atomic_flush(options.atomic_flush),
avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io),
persist_stats_to_disk(options.persist_stats_to_disk),
@ -921,6 +926,9 @@ void ImmutableDBOptions::Dump(Logger* log) const {
manual_wal_flush);
ROCKS_LOG_HEADER(log, " Options.wal_compression: %d",
wal_compression);
ROCKS_LOG_HEADER(log,
" Options.background_close_inactive_wals: %d",
background_close_inactive_wals);
ROCKS_LOG_HEADER(log, " Options.atomic_flush: %d", atomic_flush);
ROCKS_LOG_HEADER(log,
" Options.avoid_unnecessary_blocking_io: %d",

View File

@ -84,6 +84,7 @@ struct ImmutableDBOptions {
bool two_write_queues;
bool manual_wal_flush;
CompressionType wal_compression;
bool background_close_inactive_wals;
bool atomic_flush;
bool avoid_unnecessary_blocking_io;
bool persist_stats_to_disk;

View File

@ -353,6 +353,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"two_write_queues=false;"
"manual_wal_flush=false;"
"wal_compression=kZSTD;"
"background_close_inactive_wals=true;"
"seq_per_batch=false;"
"atomic_flush=false;"
"avoid_unnecessary_blocking_io=false;"

View File

@ -0,0 +1 @@
* Inactive WALs are immediately closed upon being fully sync-ed rather than in a background thread. This is to ensure LinkFile() is not called on files still open for write, which might not be supported by some FileSystem implementations. This should not be a performance issue, but an opt-out is available with with new DB option `background_close_inactive_wals`.

View File

@ -762,27 +762,38 @@ TEST_F(CheckpointTest, CheckpointWithParallelWrites) {
class CheckpointTestWithWalParams
: public CheckpointTest,
public testing::WithParamInterface<std::tuple<uint64_t, bool, bool>> {
public testing::WithParamInterface<
std::tuple<uint64_t, bool, bool, bool>> {
public:
uint64_t GetLogSizeForFlush() { return std::get<0>(GetParam()); }
bool GetWalsInManifest() { return std::get<1>(GetParam()); }
bool GetManualWalFlush() { return std::get<2>(GetParam()); }
bool GetBackgroundCloseInactiveWals() { return std::get<3>(GetParam()); }
};
INSTANTIATE_TEST_CASE_P(CheckpointTestWithWalParams,
CheckpointTestWithWalParams,
INSTANTIATE_TEST_CASE_P(NormalWalParams, CheckpointTestWithWalParams,
::testing::Combine(::testing::Values(0U, 100000000U),
::testing::Bool(),
::testing::Bool()));
::testing::Bool(), ::testing::Bool(),
::testing::Values(false)));
INSTANTIATE_TEST_CASE_P(DeprecatedWalParams, CheckpointTestWithWalParams,
::testing::Values(std::make_tuple(100000000U, true,
false, true)));
TEST_P(CheckpointTestWithWalParams, CheckpointWithUnsyncedDataDropped) {
Options options = CurrentOptions();
options.max_write_buffer_number = 4;
options.track_and_verify_wals_in_manifest = GetWalsInManifest();
options.manual_wal_flush = GetManualWalFlush();
options.background_close_inactive_wals = GetBackgroundCloseInactiveWals();
auto fault_fs = std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
if (options.background_close_inactive_wals) {
// Disable this hygiene check when the fix is disabled
fault_fs->SetAllowLinkOpenFile();
}
options.env = fault_fs_env.get();
Reopen(options);
ASSERT_OK(Put("key1", "val1"));

View File

@ -17,6 +17,7 @@
#include "utilities/fault_injection_fs.h"
#include <algorithm>
#include <cstdio>
#include <functional>
#include <utility>
@ -886,6 +887,12 @@ IOStatus FaultInjectionTestFS::LinkFile(const std::string& s,
if (io_s.ok()) {
{
MutexLock l(&mutex_);
if (!allow_link_open_file_ &&
open_managed_files_.find(s) != open_managed_files_.end()) {
fprintf(stderr, "Attempt to LinkFile while open for write: %s\n",
s.c_str());
abort();
}
if (db_file_state_.find(s) != db_file_state_.end()) {
db_file_state_[t] = db_file_state_[s];
}

View File

@ -203,6 +203,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
filesystem_active_(true),
filesystem_writable_(false),
read_unsynced_data_(true),
allow_link_open_file_(false),
thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)),
enable_write_error_injection_(false),
enable_metadata_write_error_injection_(false),
@ -365,6 +366,14 @@ class FaultInjectionTestFS : public FileSystemWrapper {
read_unsynced_data_ = read_unsynced_data;
}
bool ReadUnsyncedData() const { return read_unsynced_data_; }
// FaultInjectionTestFS normally includes a hygiene check for FileSystem
// implementations that only support LinkFile() on closed files (not open
// for write). Setting this to true bypasses the check.
void SetAllowLinkOpenFile(bool allow_link_open_file = true) {
allow_link_open_file_ = allow_link_open_file;
}
void AssertNoOpenFile() { assert(open_managed_files_.empty()); }
IOStatus GetError() { return error_; }
@ -565,6 +574,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
bool filesystem_writable_; // Bypass FaultInjectionTestFS and go directly
// to underlying FS for writable files
bool read_unsynced_data_; // See SetReadUnsyncedData()
bool allow_link_open_file_; // See SetAllowLinkOpenFile()
IOStatus error_;
enum ErrorType : int {