FaultInjectionTestFS read unsynced data by default (#12729)

Summary:
In places (e.g. GetSortedWals()) RocksDB relies on querying the file size or even reading the contents of files currently open for writing, and as in POSIX semantics, expects to see the flushed size and contents regardless of what has been synced. FaultInjectionTestFS historically did not emulate this behavior, only showing synced data from such read operations. (Different from FaultInjectionTestEnv--sigh.)

This change makes the "proper" behavior the default behavior, at least for GetFileSize and FSSequentialFile. However, this new functionality is disabled in db_stress because of undiagnosed, unresolved issues.

Also removes unused and confusing field `pos_at_last_flush_`

This change is needed to support testing a relevant bug fix (in a follow-up diff).  Other suggested follow-up:
* Fix db_stress not to rely on the old behavior, and fix a related FIXME in db_stress_test_base.cc in LockWAL testing.
* Fill in some corner cases in the FileSystem API for reading unsynced data (see new TODO items).
* Consider deprecating and removing Flush() API functions from FileSystem APIs. It is not clear to me that there is a supported scenario in which they do anything but confuse API users and developers. If there is a use for them, it doesn't appear to be tested.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12729

Test Plan: applies to all unit tests successfully, just updating the unit test from https://github.com/facebook/rocksdb/issues/12556 due to relying on the errant behavior. Also added a specific unit test

Reviewed By: hx235

Differential Revision: D58091835

Pulled By: pdillinger

fbshipit-source-id: f47a63b2b000f5875b6293a98577bff663d7fd33
This commit is contained in:
Peter Dillinger 2024-06-04 15:25:23 -07:00 committed by Facebook GitHub Bot
parent 8523f0a86a
commit 9f4c597d83
5 changed files with 165 additions and 21 deletions

View File

@ -630,13 +630,13 @@ INSTANTIATE_TEST_CASE_P(
std::make_tuple(false, kSyncWal, kEnd), std::make_tuple(false, kSyncWal, kEnd),
std::make_tuple(true, kSyncWal, kEnd))); std::make_tuple(true, kSyncWal, kEnd)));
class FaultInjectionFSTest : public DBTestBase { class FaultInjectionDBTest : public DBTestBase {
public: public:
FaultInjectionFSTest() FaultInjectionDBTest()
: DBTestBase("fault_injection_fs_test", /*env_do_fsync=*/false) {} : DBTestBase("fault_injection_fs_test", /*env_do_fsync=*/false) {}
}; };
TEST_F(FaultInjectionFSTest, SyncWALDuringDBClose) { TEST_F(FaultInjectionDBTest, SyncWALDuringDBClose) {
std::shared_ptr<FaultInjectionTestFS> fault_fs = std::shared_ptr<FaultInjectionTestFS> fault_fs =
std::make_shared<FaultInjectionTestFS>(env_->GetFileSystem()); std::make_shared<FaultInjectionTestFS>(env_->GetFileSystem());
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fault_fs)); std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fault_fs));
@ -647,6 +647,7 @@ TEST_F(FaultInjectionFSTest, SyncWALDuringDBClose) {
Reopen(options); Reopen(options);
ASSERT_OK(Put("k1", "v1")); ASSERT_OK(Put("k1", "v1"));
Close(); Close();
ASSERT_OK(fault_fs->DropUnsyncedFileData());
Reopen(options); Reopen(options);
ASSERT_EQ("NOT_FOUND", Get("k1")); ASSERT_EQ("NOT_FOUND", Get("k1"));
Destroy(options); Destroy(options);
@ -659,6 +660,54 @@ TEST_F(FaultInjectionFSTest, SyncWALDuringDBClose) {
ASSERT_EQ("v1", Get("k1")); ASSERT_EQ("v1", Get("k1"));
Destroy(options); Destroy(options);
} }
TEST(FaultInjectionFSTest, ReadUnsyncedData) {
std::shared_ptr<FaultInjectionTestFS> fault_fs =
std::make_shared<FaultInjectionTestFS>(FileSystem::Default());
ASSERT_TRUE(fault_fs->ReadUnsyncedData());
uint32_t len = Random::GetTLSInstance()->Uniform(10000) + 1;
Random rnd(len);
// Create partially synced file
std::string f = test::PerThreadDBPath("read_unsynced.data");
std::string data = rnd.RandomString(len);
{
std::unique_ptr<FSWritableFile> w;
ASSERT_OK(fault_fs->NewWritableFile(f, {}, &w, nullptr));
uint32_t synced_len = rnd.Uniform(len + 1);
ASSERT_OK(w->Append(Slice(data.data(), synced_len), {}, nullptr));
ASSERT_OK(w->Sync({}, nullptr));
ASSERT_OK(w->Append(Slice(data.data() + synced_len, len - synced_len), {},
nullptr));
// no sync here
ASSERT_OK(w->Close({}, nullptr));
}
// Test file size includes unsynced data
{
uint64_t file_size;
ASSERT_OK(fault_fs->GetFileSize(f, {}, &file_size, nullptr));
ASSERT_EQ(len, file_size);
}
// Test read file contents, with two reads that probably don't
// align with the unsynced split.
{
std::unique_ptr<FSSequentialFile> r;
ASSERT_OK(fault_fs->NewSequentialFile(f, {}, &r, nullptr));
uint32_t first_read_len = rnd.Uniform(len + 1);
Slice sl;
std::unique_ptr<char[]> scratch(new char[first_read_len]);
ASSERT_OK(r->Read(first_read_len, {}, &sl, scratch.get(), nullptr));
ASSERT_EQ(first_read_len, sl.size());
ASSERT_EQ(0, sl.compare(Slice(data.data(), first_read_len)));
uint32_t second_read_len = len - first_read_len;
scratch.reset(new char[second_read_len]);
ASSERT_OK(r->Read(second_read_len, {}, &sl, scratch.get(), nullptr));
ASSERT_EQ(second_read_len, sl.size());
ASSERT_EQ(0,
sl.compare(Slice(data.data() + first_read_len, second_read_len)));
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -20,10 +20,10 @@
// NOTE that if FLAGS_test_batches_snapshots is set, the test will have // NOTE that if FLAGS_test_batches_snapshots is set, the test will have
// different behavior. See comment of the flag for details. // different behavior. See comment of the flag for details.
#include "db_stress_tool/db_stress_shared_state.h"
#ifdef GFLAGS #ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_driver.h" #include "db_stress_tool/db_stress_driver.h"
#include "db_stress_tool/db_stress_shared_state.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "utilities/fault_injection_fs.h" #include "utilities/fault_injection_fs.h"
@ -93,6 +93,10 @@ int db_stress_tool(int argc, char** argv) {
// This will be overwritten in StressTest::Open() for open fault injection // This will be overwritten in StressTest::Open() for open fault injection
// and in RunStressTestImpl() for proper write fault injection setup. // and in RunStressTestImpl() for proper write fault injection setup.
fault_fs_guard->SetFilesystemDirectWritable(true); fault_fs_guard->SetFilesystemDirectWritable(true);
// FIXME: For some reason(s), db_stress currently relies on the WRONG
// semantic of reading only synced data from files currently open for
// write.
fault_fs_guard->SetReadUnsyncedData(false);
fault_env_guard = fault_env_guard =
std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard); std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard);
raw_env = fault_env_guard.get(); raw_env = fault_env_guard.get();

View File

@ -256,6 +256,8 @@ class WritableFileWriter {
// returns NotSupported status. // returns NotSupported status.
IOStatus SyncWithoutFlush(const IOOptions& opts, bool use_fsync); IOStatus SyncWithoutFlush(const IOOptions& opts, bool use_fsync);
// Size including unflushed data written to this writer. If the next op is
// a successful Close, the file size will be this.
uint64_t GetFileSize() const { uint64_t GetFileSize() const {
return filesize_.load(std::memory_order_acquire); return filesize_.load(std::memory_order_acquire);
} }

View File

@ -273,9 +273,6 @@ IOStatus TestFSWritableFile::Flush(const IOOptions&, IODebugContext*) {
if (!fs_->IsFilesystemActive()) { if (!fs_->IsFilesystemActive()) {
return fs_->GetError(); return fs_->GetError();
} }
if (fs_->IsFilesystemActive()) {
state_.pos_at_last_flush_ = state_.pos_;
}
return IOStatus::OK(); return IOStatus::OK();
} }
@ -355,6 +352,7 @@ IOStatus TestFSRandomRWFile::Read(uint64_t offset, size_t n,
if (!fs_->IsFilesystemActive()) { if (!fs_->IsFilesystemActive()) {
return fs_->GetError(); return fs_->GetError();
} }
// TODO (low priority): fs_->ReadUnsyncedData()
return target_->Read(offset, n, options, result, scratch, dbg); return target_->Read(offset, n, options, result, scratch, dbg);
} }
@ -399,6 +397,7 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
return fs_->GetError(); return fs_->GetError();
} }
IOStatus s = target_->Read(offset, n, options, result, scratch, dbg); IOStatus s = target_->Read(offset, n, options, result, scratch, dbg);
// TODO (low priority): fs_->ReadUnsyncedData()
if (s.ok()) { if (s.ok()) {
s = fs_->InjectThreadSpecificReadError( s = fs_->InjectThreadSpecificReadError(
FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(), FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
@ -432,6 +431,7 @@ IOStatus TestFSRandomAccessFile::ReadAsync(
s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr); s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr);
} }
} }
// TODO (low priority): fs_->ReadUnsyncedData()
if (!ret.ok()) { if (!ret.ok()) {
res.status = ret; res.status = ret;
cb(res, cb_arg); cb(res, cb_arg);
@ -459,6 +459,7 @@ IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
/*need_count_increase=*/true, /*need_count_increase=*/true,
/*fault_injected=*/&this_injected_error); /*fault_injected=*/&this_injected_error);
injected_error |= this_injected_error; injected_error |= this_injected_error;
// TODO (low priority): fs_->ReadUnsyncedData()
} }
if (s.ok()) { if (s.ok()) {
s = fs_->InjectThreadSpecificReadError( s = fs_->InjectThreadSpecificReadError(
@ -479,12 +480,55 @@ size_t TestFSRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
return target_->GetUniqueId(id, max_size); return target_->GetUniqueId(id, max_size);
} }
} }
void FaultInjectionTestFS::AddUnsyncedToRead(const std::string& fname,
size_t pos, size_t n,
Slice* result, char* scratch) {
// Should be checked prior
assert(result->size() < n);
size_t pos_after = pos + result->size();
MutexLock l(&mutex_);
auto it = db_file_state_.find(fname);
if (it != db_file_state_.end()) {
auto& st = it->second;
if (st.pos_ > static_cast<ssize_t>(pos_after)) {
size_t remaining_requested = n - result->size();
size_t to_copy = std::min(remaining_requested,
static_cast<size_t>(st.pos_) - pos_after);
size_t buffer_offset = pos_after - static_cast<size_t>(std::max(
st.pos_at_last_sync_, ssize_t{0}));
// Data might have been dropped from buffer
if (st.buffer_.size() > buffer_offset) {
to_copy = std::min(to_copy, st.buffer_.size() - buffer_offset);
if (result->data() != scratch) {
// TODO: this will be needed when supporting random reads
// but not currently used
abort();
// NOTE: might overlap
// std::copy_n(result->data(), result->size(), scratch);
}
std::copy_n(st.buffer_.data() + buffer_offset, to_copy,
scratch + result->size());
*result = Slice(scratch, result->size() + to_copy);
}
}
}
}
IOStatus TestFSSequentialFile::Read(size_t n, const IOOptions& options, IOStatus TestFSSequentialFile::Read(size_t n, const IOOptions& options,
Slice* result, char* scratch, Slice* result, char* scratch,
IODebugContext* dbg) { IODebugContext* dbg) {
IOStatus s = target()->Read(n, options, result, scratch, dbg); IOStatus s = target()->Read(n, options, result, scratch, dbg);
if (s.ok() && fs_->ShouldInjectRandomReadError()) { if (s.ok()) {
return IOStatus::IOError("injected seq read error"); if (fs_->ShouldInjectRandomReadError()) {
read_pos_ += result->size();
return IOStatus::IOError("injected seq read error");
}
if (fs_->ReadUnsyncedData() && result->size() < n) {
fs_->AddUnsyncedToRead(fname_, read_pos_, n, result, scratch);
}
read_pos_ += result->size();
} }
return s; return s;
} }
@ -495,8 +539,11 @@ IOStatus TestFSSequentialFile::PositionedRead(uint64_t offset, size_t n,
IODebugContext* dbg) { IODebugContext* dbg) {
IOStatus s = IOStatus s =
target()->PositionedRead(offset, n, options, result, scratch, dbg); target()->PositionedRead(offset, n, options, result, scratch, dbg);
if (s.ok() && fs_->ShouldInjectRandomReadError()) { if (s.ok()) {
return IOStatus::IOError("injected seq positioned read error"); if (fs_->ShouldInjectRandomReadError()) {
return IOStatus::IOError("injected seq positioned read error");
}
// TODO (low priority): fs_->ReadUnsyncedData()
} }
return s; return s;
} }
@ -713,7 +760,7 @@ IOStatus FaultInjectionTestFS::NewSequentialFile(
} }
IOStatus io_s = target()->NewSequentialFile(fname, file_opts, result, dbg); IOStatus io_s = target()->NewSequentialFile(fname, file_opts, result, dbg);
if (io_s.ok()) { if (io_s.ok()) {
result->reset(new TestFSSequentialFile(std::move(*result), this)); result->reset(new TestFSSequentialFile(std::move(*result), this, fname));
} }
return io_s; return io_s;
} }
@ -743,6 +790,26 @@ IOStatus FaultInjectionTestFS::DeleteFile(const std::string& f,
return io_s; return io_s;
} }
IOStatus FaultInjectionTestFS::GetFileSize(const std::string& f,
const IOOptions& options,
uint64_t* file_size,
IODebugContext* dbg) {
// TODO: inject error, under what setting?
IOStatus io_s = target()->GetFileSize(f, options, file_size, dbg);
if (!io_s.ok()) {
return io_s;
}
if (ReadUnsyncedData()) {
// Need to report flushed size, not synced size
MutexLock l(&mutex_);
auto it = db_file_state_.find(f);
if (it != db_file_state_.end()) {
*file_size = it->second.pos_;
}
}
return io_s;
}
IOStatus FaultInjectionTestFS::RenameFile(const std::string& s, IOStatus FaultInjectionTestFS::RenameFile(const std::string& s,
const std::string& t, const std::string& t,
const IOOptions& options, const IOOptions& options,

View File

@ -36,16 +36,12 @@ struct FSFileState {
std::string filename_; std::string filename_;
ssize_t pos_; ssize_t pos_;
ssize_t pos_at_last_sync_; ssize_t pos_at_last_sync_;
ssize_t pos_at_last_flush_;
std::string buffer_; std::string buffer_;
explicit FSFileState(const std::string& filename) explicit FSFileState(const std::string& filename)
: filename_(filename), : filename_(filename), pos_(-1), pos_at_last_sync_(-1) {}
pos_(-1),
pos_at_last_sync_(-1),
pos_at_last_flush_(-1) {}
FSFileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {} FSFileState() : pos_(-1), pos_at_last_sync_(-1) {}
bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; } bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; }
@ -163,8 +159,10 @@ class TestFSRandomAccessFile : public FSRandomAccessFile {
class TestFSSequentialFile : public FSSequentialFileOwnerWrapper { class TestFSSequentialFile : public FSSequentialFileOwnerWrapper {
public: public:
explicit TestFSSequentialFile(std::unique_ptr<FSSequentialFile>&& f, explicit TestFSSequentialFile(std::unique_ptr<FSSequentialFile>&& f,
FaultInjectionTestFS* fs) FaultInjectionTestFS* fs, std::string fname)
: FSSequentialFileOwnerWrapper(std::move(f)), fs_(fs) {} : FSSequentialFileOwnerWrapper(std::move(f)),
fs_(fs),
fname_(std::move(fname)) {}
IOStatus Read(size_t n, const IOOptions& options, Slice* result, IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override; char* scratch, IODebugContext* dbg) override;
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options, IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options,
@ -173,13 +171,15 @@ class TestFSSequentialFile : public FSSequentialFileOwnerWrapper {
private: private:
FaultInjectionTestFS* fs_; FaultInjectionTestFS* fs_;
std::string fname_;
size_t read_pos_ = 0;
}; };
class TestFSDirectory : public FSDirectory { class TestFSDirectory : public FSDirectory {
public: public:
explicit TestFSDirectory(FaultInjectionTestFS* fs, std::string dirname, explicit TestFSDirectory(FaultInjectionTestFS* fs, std::string dirname,
FSDirectory* dir) FSDirectory* dir)
: fs_(fs), dirname_(dirname), dir_(dir) {} : fs_(fs), dirname_(std::move(dirname)), dir_(dir) {}
~TestFSDirectory() {} ~TestFSDirectory() {}
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override; IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override;
@ -202,6 +202,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
: FileSystemWrapper(base), : FileSystemWrapper(base),
filesystem_active_(true), filesystem_active_(true),
filesystem_writable_(false), filesystem_writable_(false),
read_unsynced_data_(true),
thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)), thread_local_error_(new ThreadLocalPtr(DeleteThreadLocalErrorContext)),
enable_write_error_injection_(false), enable_write_error_injection_(false),
enable_metadata_write_error_injection_(false), enable_metadata_write_error_injection_(false),
@ -253,6 +254,8 @@ class FaultInjectionTestFS : public FileSystemWrapper {
IOStatus DeleteFile(const std::string& f, const IOOptions& options, IOStatus DeleteFile(const std::string& f, const IOOptions& options,
IODebugContext* dbg) override; IODebugContext* dbg) override;
IOStatus GetFileSize(const std::string& f, const IOOptions& options,
uint64_t* file_size, IODebugContext* dbg) override;
IOStatus RenameFile(const std::string& s, const std::string& t, IOStatus RenameFile(const std::string& s, const std::string& t,
const IOOptions& options, IODebugContext* dbg) override; const IOOptions& options, IODebugContext* dbg) override;
@ -347,6 +350,21 @@ class FaultInjectionTestFS : public FileSystemWrapper {
MutexLock l(&mutex_); MutexLock l(&mutex_);
filesystem_writable_ = writable; filesystem_writable_ = writable;
} }
// In places (e.g. GetSortedWals()) RocksDB relies on querying the file size
// or even reading the contents of files currently open for writing, and
// as in POSIX semantics, expects to see the flushed size and contents
// regardless of what has been synced. FaultInjectionTestFS historically
// did not emulate this behavior, only showing synced data from such read
// operations. (Different from FaultInjectionTestEnv--sigh.) Calling this
// function with false restores this historical behavior for testing
// stability, but use of this semantics must be phased out as it is
// inconsistent with expected FileSystem semantics. In other words, this
// functionality is DEPRECATED. Intended to be set after construction and
// unchanged (not thread safe).
void SetReadUnsyncedData(bool read_unsynced_data) {
read_unsynced_data_ = read_unsynced_data;
}
bool ReadUnsyncedData() const { return read_unsynced_data_; }
void AssertNoOpenFile() { assert(open_managed_files_.empty()); } void AssertNoOpenFile() { assert(open_managed_files_.empty()); }
IOStatus GetError() { return error_; } IOStatus GetError() { return error_; }
@ -530,6 +548,9 @@ class FaultInjectionTestFS : public FileSystemWrapper {
// saved callstack // saved callstack
void PrintFaultBacktrace(); void PrintFaultBacktrace();
void AddUnsyncedToRead(const std::string& fname, size_t offset, size_t n,
Slice* result, char* scratch);
private: private:
port::Mutex mutex_; port::Mutex mutex_;
std::map<std::string, FSFileState> db_file_state_; std::map<std::string, FSFileState> db_file_state_;
@ -543,6 +564,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
bool filesystem_active_; // Record flushes, syncs, writes bool filesystem_active_; // Record flushes, syncs, writes
bool filesystem_writable_; // Bypass FaultInjectionTestFS and go directly bool filesystem_writable_; // Bypass FaultInjectionTestFS and go directly
// to underlying FS for writable files // to underlying FS for writable files
bool read_unsynced_data_; // See SetReadUnsyncedData()
IOStatus error_; IOStatus error_;
enum ErrorType : int { enum ErrorType : int {