rocksdb/utilities/fault_injection_fs.cc
Hui Xiao ebe2116240 Remove false-postive assertion in FaultInjectionTestFS::RenameFile (#12828)
Summary:
**Context/Summary:**
The assertion `tlist.find(tdn.second) == tlist.end()` 9eebaf11cb/utilities/fault_injection_fs.cc (L1003) can catch us false positive.

Some context
(1) When fault injection is enabled and db open fails because of that, crash test will retry open without injected error in order to proceed with a clean open:
9eebaf11cb/db_stress_tool/db_stress_test_base.cc (L3559)
9eebaf11cb/db_stress_tool/db_stress_test_base.cc (L3586-L3639)
(2)
a. `FaultInjectionTestFS::dir_to_new_files_since_last_sync` records files that are created but not yet synced.
b. When we create CURRENT, we will first create a temp file and rename it as "CURRENT". As part of the renaming, we will [assert](9eebaf11cb/utilities/fault_injection_fs.cc (L1003)) `FaultInjectionTestFS::dir_to_new_files_since_last_sync ` doesn't already have a file named `CURRENT`.

Suppose the following sequence of events happened:

(1) 1st open, with metadata write error
1. As part of creating CURRENT file, added "CURRENT" to `FaultInjectionTestFS::dir_to_new_files_since_last_sync_`
9eebaf11cb/utilities/fault_injection_fs.cc (L735)
2.  `SyncDir()` here 9eebaf11cb/file/filename.cc (L412) failed with injected metadata write error. Therefore, "CURRENT" file didn't get removed from `FaultInjectionTestFS::dir_to_new_files_since_last_sync_` as it would if `SyncDir()` succeeded 9eebaf11cb/utilities/fault_injection_fs.h (L344)

(2) 2st open
1. Attempted to create a CURRENT file and failed during renaming since `FaultInjectionTestFS::dir_to_new_files_since_last_sync_` already had a file called CURRENT. So  will fail
```
assertion failed - tlist.find(tdn.second) == tlist.end()
```

This PR fixed this by removing the assertion. It used to catch us some missing sync of some directory (e.,g https://github.com/facebook/rocksdb/pull/10573) so we will keep thinking about a better way to catch that.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12828

Test Plan:
Command constantly failed before the fix but passed after the PR running for 10 minutes
```
python3 tools/db_crashtest.py --simple blackbox --interval=10 --WAL_size_limit_MB=1 --WAL_ttl_seconds=60 --acquire_snapshot_one_in=100 --adaptive_readahead=1 --adm_policy=2 --advise_random_on_open=1 --allow_concurrent_memtable_write=1 --allow_data_in_errors=True --allow_fallocate=1 --async_io=0 --auto_readahead_size=1 --avoid_flush_during_recovery=0 --avoid_flush_during_shutdown=0 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=0 --bgerror_resume_retry_interval=100 --block_align=0 --block_protection_bytes_per_key=8 --block_size=16384 --bloom_before_level=1 --bloom_bits=10 --bottommost_compression_type=lz4hc --bottommost_file_compaction_delay=86400 --bytes_per_sync=0 --cache_index_and_filter_blocks=1 --cache_index_and_filter_blocks_with_high_priority=0 --cache_size=8388608 --cache_type=tiered_auto_hyper_clock_cache --charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=0 --charge_table_reader=0 --check_multiget_consistency=0 --check_multiget_entity_consistency=0 --checkpoint_one_in=10000 --checksum_type=kCRC32c --clear_column_family_one_in=0 --column_families=1 --compact_files_one_in=1000 --compact_range_one_in=1000000 --compaction_pri=3 --compaction_readahead_size=0 --compaction_ttl=1 --compress_format_version=1 --compressed_secondary_cache_ratio=0.5 --compressed_secondary_cache_size=0 --compression_checksum=0 --compression_max_dict_buffer_bytes=15 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=zstd --compression_use_zstd_dict_trainer=1 --compression_zstd_max_train_bytes=65536 --continuous_verification_interval=0 --daily_offpeak_time_utc= --data_block_index_type=1 --db_write_buffer_size=0 --default_temperature=kHot --default_write_temperature=kUnknown --delete_obsolete_files_period_micros=30000000 --delpercent=4 --delrangepercent=1 --destroy_db_initially=0 --detect_filter_construct_corruption=1 --disable_file_deletions_one_in=10000 --disable_manual_compaction_one_in=10000 --disable_wal=0 --dump_malloc_stats=0 --enable_checksum_handoff=1 --enable_compaction_filter=0 --enable_custom_split_merge=0 --enable_do_not_compress_roles=0 --enable_index_compression=1 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=0 --enable_sst_partitioner_factory=1 --enable_thread_tracking=1 --enable_write_thread_adaptive_yield=0 --error_recovery_with_no_fault_injection=1 --exclude_wal_from_write_fault_injection=1 --fail_if_options_file_error=1 --fifo_allow_compaction=0 --file_checksum_impl=crc32c --fill_cache=1 --flush_one_in=1000000 --format_version=3 --get_all_column_family_metadata_one_in=1000000 --get_current_wal_file_one_in=0 --get_live_files_apis_one_in=1000000 --get_properties_of_all_tables_one_in=100000 --get_property_one_in=100000 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=2097152 --high_pri_pool_ratio=0 --index_block_restart_interval=2 --index_shortening=0 --index_type=2 --ingest_external_file_one_in=0 --initial_auto_readahead_size=16384 --inplace_update_support=0 --iterpercent=10 --key_len_percent_dist=1,30,69 --key_may_exist_one_in=100000 --last_level_temperature=kWarm --level_compaction_dynamic_level_bytes=0 --lock_wal_one_in=10000 --log_file_time_to_roll=60 --log_readahead_size=16777216 --long_running_snapshots=1 --low_pri_pool_ratio=0.5 --lowest_used_cache_tier=1 --manifest_preallocation_size=0 --manual_wal_flush_one_in=0 --mark_for_compaction_one_file_in=10 --max_auto_readahead_size=16384 --max_background_compactions=1 --max_bytes_for_level_base=67108864 --max_key=1000000 --max_key_len=3 --max_log_file_size=0 --max_manifest_file_size=1073741824 --max_sequential_skip_in_iterations=1 --max_total_wal_size=0 --max_write_batch_group_size_bytes=16 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=2097152 --memtable_insert_hint_per_batch=0 --memtable_max_range_deletions=0 --memtable_prefix_bloom_size_ratio=0.1 --memtable_protection_bytes_per_key=8 --memtable_whole_key_filtering=0 --memtablerep=skip_list --metadata_charge_policy=1 --metadata_read_fault_one_in=32 --metadata_write_fault_one_in=0 --min_write_buffer_number_to_merge=2 --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=1 --open_files=-1 --open_metadata_read_fault_one_in=0 --open_metadata_write_fault_one_in=8 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=100000000 --optimize_filters_for_hits=0 --optimize_filters_for_memory=0 --optimize_multiget_for_io=1 --paranoid_file_checks=1 --partition_filters=1 --partition_pinning=3 --pause_background_one_in=1000000 --periodic_compaction_seconds=1000 --prefix_size=5 --prefixpercent=5 --prepopulate_block_cache=1 --preserve_internal_time_seconds=0 --progress_reports=0 --promote_l0_one_in=0 --read_amp_bytes_per_bit=32 --read_fault_one_in=0 --readahead_size=524288 --readpercent=45 --recycle_log_file_num=0 --reopen=0 --report_bg_io_stats=0 --reset_stats_one_in=1000000 --sample_for_compression=0 --secondary_cache_fault_one_in=32 --secondary_cache_uri= --set_options_one_in=0 --skip_stats_update_on_db_open=0 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=68719476736 --sqfc_name=foo --sqfc_version=1 --sst_file_manager_bytes_per_sec=0 --sst_file_manager_bytes_per_truncate=0 --stats_dump_period_sec=10 --stats_history_buffer_size=1048576 --strict_bytes_per_sync=1 --subcompactions=2 --sync=0 --sync_fault_injection=1 --table_cache_numshardbits=6 --target_file_size_base=16777216 --target_file_size_multiplier=1 --test_batches_snapshots=0 --top_level_index_pinning=2 --uncache_aggressiveness=1582 --universal_max_read_amp=4 --unpartitioned_pinning=0 --use_adaptive_mutex=0 --use_adaptive_mutex_lru=1 --use_attribute_group=1 --use_delta_encoding=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=0 --use_multi_cf_iterator=1 --use_multi_get_entity=1 --use_multiget=0 --use_put_entity_one_in=1 --use_sqfc_for_range_queries=1 --use_timed_put_one_in=0 --use_write_buffer_manager=0 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000 --verify_compression=1 --verify_db_one_in=10000 --verify_file_checksums_one_in=1000 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=none --write_buffer_size=33554432 --write_dbid_to_manifest=1 --write_fault_one_in=8 --writepercent=35
```

Reviewed By: cbi42

Differential Revision: D59241548

Pulled By: hx235

fbshipit-source-id: 5bb49e6a94943273f47578a2caf3d08ca5b67e5f
2024-07-09 15:35:54 -07:00

1366 lines
45 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright 2014 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// This test uses a custom FileSystem to keep track of the state of a file
// system the last "Sync". The data being written is cached in a "buffer".
// Only when "Sync" is called, the data will be persistent. It can simulate
// file data loss (or entire files) not protected by a "Sync". For any of the
// FileSystem related operations, by specify the "IOStatus Error", a specific
// error can be returned when file system is not activated.
#include "utilities/fault_injection_fs.h"
#include <algorithm>
#include <cstdio>
#include <functional>
#include <utility>
#include "env/composite_env_wrapper.h"
#include "port/lang.h"
#include "port/stack_trace.h"
#include "rocksdb/env.h"
#include "rocksdb/io_status.h"
#include "rocksdb/types.h"
#include "test_util/sync_point.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/xxhash.h"
namespace ROCKSDB_NAMESPACE {
const std::string kNewFileNoOverwrite;
// Assume a filename, and not a directory name like "/foo/bar/"
std::string TestFSGetDirName(const std::string filename) {
size_t found = filename.find_last_of("/\\");
if (found == std::string::npos) {
return "";
} else {
return filename.substr(0, found);
}
}
// Trim the tailing "/" in the end of `str`
std::string TestFSTrimDirname(const std::string& str) {
size_t found = str.find_last_not_of('/');
if (found == std::string::npos) {
return str;
}
return str.substr(0, found + 1);
}
// Return pair <parent directory name, file name> of a full path.
std::pair<std::string, std::string> TestFSGetDirAndName(
const std::string& name) {
std::string dirname = TestFSGetDirName(name);
std::string fname = name.substr(dirname.size() + 1);
return std::make_pair(dirname, fname);
}
// Calculate the checksum of the data with corresponding checksum
// type. If name does not match, no checksum is returned.
void CalculateTypedChecksum(const ChecksumType& checksum_type, const char* data,
size_t size, std::string* checksum) {
if (checksum_type == ChecksumType::kCRC32c) {
uint32_t v_crc32c = crc32c::Extend(0, data, size);
PutFixed32(checksum, v_crc32c);
return;
} else if (checksum_type == ChecksumType::kxxHash) {
uint32_t v = XXH32(data, size, 0);
PutFixed32(checksum, v);
}
}
IOStatus FSFileState::DropUnsyncedData() {
buffer_.resize(0);
return IOStatus::OK();
}
IOStatus FSFileState::DropRandomUnsyncedData(Random* rand) {
int range = static_cast<int>(buffer_.size());
size_t truncated_size = static_cast<size_t>(rand->Uniform(range));
buffer_.resize(truncated_size);
return IOStatus::OK();
}
IOStatus TestFSDirectory::Fsync(const IOOptions& options, IODebugContext* dbg) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus s = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kMetadataWrite, options);
if (!s.ok()) {
return s;
}
fs_->SyncDir(dirname_);
s = dir_->Fsync(options, dbg);
return s;
}
IOStatus TestFSDirectory::Close(const IOOptions& options, IODebugContext* dbg) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus s = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kMetadataWrite, options);
if (!s.ok()) {
return s;
}
s = dir_->Close(options, dbg);
return s;
}
IOStatus TestFSDirectory::FsyncWithDirOptions(
const IOOptions& options, IODebugContext* dbg,
const DirFsyncOptions& dir_fsync_options) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus s = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kMetadataWrite, options);
if (!s.ok()) {
return s;
}
fs_->SyncDir(dirname_);
s = dir_->FsyncWithDirOptions(options, dbg, dir_fsync_options);
return s;
}
TestFSWritableFile::TestFSWritableFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>&& f,
FaultInjectionTestFS* fs)
: state_(fname),
file_opts_(file_opts),
target_(std::move(f)),
writable_file_opened_(true),
fs_(fs),
unsync_data_loss_(fs_->InjectUnsyncedDataLoss()) {
assert(target_ != nullptr);
state_.pos_at_last_append_ = 0;
}
TestFSWritableFile::~TestFSWritableFile() {
if (writable_file_opened_) {
Close(IOOptions(), nullptr).PermitUncheckedError();
}
}
IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions& options,
IODebugContext* dbg) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
options, state_.filename_);
if (!s.ok()) {
return s;
}
if (target_->use_direct_io() || !unsync_data_loss_) {
// TODO(hx235): buffer data for direct IO write to simulate data loss like
// non-direct IO write
s = target_->Append(data, options, dbg);
} else {
state_.buffer_.append(data.data(), data.size());
}
if (s.ok()) {
state_.pos_at_last_append_ += data.size();
fs_->WritableFileAppended(state_);
}
return s;
}
// By setting the IngestDataCorruptionBeforeWrite(), the data corruption is
// simulated.
IOStatus TestFSWritableFile::Append(
const Slice& data, const IOOptions& options,
const DataVerificationInfo& verification_info, IODebugContext* dbg) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
if (fs_->ShouldDataCorruptionBeforeWrite()) {
return IOStatus::Corruption("Data is corrupted!");
}
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
options, state_.filename_);
if (!s.ok()) {
return s;
}
// Calculate the checksum
std::string checksum;
CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(),
data.size(), &checksum);
if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum &&
checksum != verification_info.checksum.ToString()) {
std::string msg =
"Data is corrupted! Origin data checksum: " +
verification_info.checksum.ToString(true) +
"current data checksum: " + Slice(checksum).ToString(true);
return IOStatus::Corruption(msg);
}
if (target_->use_direct_io() || !unsync_data_loss_) {
// TODO(hx235): buffer data for direct IO write to simulate data loss like
// non-direct IO write
s = target_->Append(data, options, dbg);
} else {
state_.buffer_.append(data.data(), data.size());
}
if (s.ok()) {
state_.pos_at_last_append_ += data.size();
fs_->WritableFileAppended(state_);
}
return s;
}
IOStatus TestFSWritableFile::Truncate(uint64_t size, const IOOptions& options,
IODebugContext* dbg) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
options, state_.filename_);
if (!s.ok()) {
return s;
}
s = target_->Truncate(size, options, dbg);
if (s.ok()) {
state_.pos_at_last_append_ = size;
}
return s;
}
IOStatus TestFSWritableFile::PositionedAppend(const Slice& data,
uint64_t offset,
const IOOptions& options,
IODebugContext* dbg) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
if (fs_->ShouldDataCorruptionBeforeWrite()) {
return IOStatus::Corruption("Data is corrupted!");
}
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
options, state_.filename_);
if (!s.ok()) {
return s;
}
// TODO(hx235): buffer data for direct IO write to simulate data loss like
// non-direct IO write
s = target_->PositionedAppend(data, offset, options, dbg);
if (s.ok()) {
state_.pos_at_last_append_ = offset + data.size();
fs_->WritableFileAppended(state_);
}
return s;
}
IOStatus TestFSWritableFile::PositionedAppend(
const Slice& data, uint64_t offset, const IOOptions& options,
const DataVerificationInfo& verification_info, IODebugContext* dbg) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
if (fs_->ShouldDataCorruptionBeforeWrite()) {
return IOStatus::Corruption("Data is corrupted!");
}
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
options, state_.filename_);
if (!s.ok()) {
return s;
}
// Calculate the checksum
std::string checksum;
CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(),
data.size(), &checksum);
if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum &&
checksum != verification_info.checksum.ToString()) {
std::string msg =
"Data is corrupted! Origin data checksum: " +
verification_info.checksum.ToString(true) +
"current data checksum: " + Slice(checksum).ToString(true);
return IOStatus::Corruption(msg);
}
// TODO(hx235): buffer data for direct IO write to simulate data loss like
// non-direct IO write
s = target_->PositionedAppend(data, offset, options, dbg);
if (s.ok()) {
state_.pos_at_last_append_ = offset + data.size();
fs_->WritableFileAppended(state_);
}
return s;
}
IOStatus TestFSWritableFile::Close(const IOOptions& options,
IODebugContext* dbg) {
MutexLock l(&mutex_);
fs_->WritableFileClosed(state_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus io_s = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kMetadataWrite, options);
if (!io_s.ok()) {
return io_s;
}
writable_file_opened_ = false;
// Drop buffered data that was never synced because close is not a syncing
// mechanism in POSIX file semantics.
state_.buffer_.resize(0);
io_s = target_->Close(options, dbg);
return io_s;
}
IOStatus TestFSWritableFile::Flush(const IOOptions&, IODebugContext*) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
return IOStatus::OK();
}
IOStatus TestFSWritableFile::Sync(const IOOptions& options,
IODebugContext* dbg) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
if (target_->use_direct_io()) {
// For Direct IO mode, we don't buffer anything in TestFSWritableFile.
// So just return
return IOStatus::OK();
}
IOStatus io_s = target_->Append(state_.buffer_, options, dbg);
state_.buffer_.resize(0);
// Ignore sync errors
target_->Sync(options, dbg).PermitUncheckedError();
state_.pos_at_last_sync_ = state_.pos_at_last_append_;
fs_->WritableFileSynced(state_);
return io_s;
}
IOStatus TestFSWritableFile::RangeSync(uint64_t offset, uint64_t nbytes,
const IOOptions& options,
IODebugContext* dbg) {
MutexLock l(&mutex_);
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
// Assumes caller passes consecutive byte ranges.
uint64_t sync_limit = offset + nbytes;
uint64_t buf_begin =
state_.pos_at_last_sync_ < 0 ? 0 : state_.pos_at_last_sync_;
IOStatus io_s;
if (sync_limit < buf_begin) {
return io_s;
}
uint64_t num_to_sync = std::min(static_cast<uint64_t>(state_.buffer_.size()),
sync_limit - buf_begin);
Slice buf_to_sync(state_.buffer_.data(), num_to_sync);
io_s = target_->Append(buf_to_sync, options, dbg);
state_.buffer_ = state_.buffer_.substr(num_to_sync);
// Ignore sync errors
target_->RangeSync(offset, nbytes, options, dbg).PermitUncheckedError();
state_.pos_at_last_sync_ = offset + num_to_sync;
fs_->WritableFileSynced(state_);
return io_s;
}
TestFSRandomRWFile::TestFSRandomRWFile(const std::string& /*fname*/,
std::unique_ptr<FSRandomRWFile>&& f,
FaultInjectionTestFS* fs)
: target_(std::move(f)), file_opened_(true), fs_(fs) {
assert(target_ != nullptr);
}
TestFSRandomRWFile::~TestFSRandomRWFile() {
if (file_opened_) {
Close(IOOptions(), nullptr).PermitUncheckedError();
}
}
IOStatus TestFSRandomRWFile::Write(uint64_t offset, const Slice& data,
const IOOptions& options,
IODebugContext* dbg) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
return target_->Write(offset, data, options, dbg);
}
IOStatus TestFSRandomRWFile::Read(uint64_t offset, size_t n,
const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) const {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
// TODO (low priority): fs_->ReadUnsyncedData()
return target_->Read(offset, n, options, result, scratch, dbg);
}
IOStatus TestFSRandomRWFile::Close(const IOOptions& options,
IODebugContext* dbg) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
file_opened_ = false;
return target_->Close(options, dbg);
}
IOStatus TestFSRandomRWFile::Flush(const IOOptions& options,
IODebugContext* dbg) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
return target_->Flush(options, dbg);
}
IOStatus TestFSRandomRWFile::Sync(const IOOptions& options,
IODebugContext* dbg) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
return target_->Sync(options, dbg);
}
TestFSRandomAccessFile::TestFSRandomAccessFile(
const std::string& /*fname*/, std::unique_ptr<FSRandomAccessFile>&& f,
FaultInjectionTestFS* fs)
: target_(std::move(f)), fs_(fs) {
assert(target_ != nullptr);
}
IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
const IOOptions& options, Slice* result,
char* scratch,
IODebugContext* dbg) const {
TEST_SYNC_POINT("FaultInjectionTestFS::RandomRead");
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus s = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kRead, options, "",
FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
scratch, /*need_count_increase=*/true,
/*fault_injected=*/nullptr);
if (!s.ok()) {
return s;
}
s = target_->Read(offset, n, options, result, scratch, dbg);
// TODO (low priority): fs_->ReadUnsyncedData()
return s;
}
IOStatus TestFSRandomAccessFile::ReadAsync(
FSReadRequest& req, const IOOptions& opts,
std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
void** io_handle, IOHandleDeleter* del_fn, IODebugContext* /*dbg*/) {
IOStatus res_status;
FSReadRequest res;
IOStatus s;
if (!fs_->IsFilesystemActive()) {
res_status = fs_->GetError();
}
if (res_status.ok()) {
res_status = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kRead, opts, "",
FaultInjectionTestFS::ErrorOperation::kRead, &res.result,
use_direct_io(), req.scratch, /*need_count_increase=*/true,
/*fault_injected=*/nullptr);
}
if (res_status.ok()) {
s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr);
// TODO (low priority): fs_->ReadUnsyncedData()
} else {
// If theres no injected error, then cb will be called asynchronously when
// target_ actually finishes the read. But if theres an injected error, it
// needs to immediately call cb(res, cb_arg) s since target_->ReadAsync()
// isnt invoked at all.
res.status = res_status;
cb(res, cb_arg);
}
// We return ReadAsync()'s status intead of injected error status here since
// the return status is not supposed to be the status of the actual IO (i.e,
// the actual async read). The actual status of the IO will be passed to cb()
// callback upon the actual read finishes or like above when injected error
// happens.
return s;
}
IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
const IOOptions& options,
IODebugContext* dbg) {
if (!fs_->IsFilesystemActive()) {
return fs_->GetError();
}
IOStatus s = target_->MultiRead(reqs, num_reqs, options, dbg);
// TODO (low priority): fs_->ReadUnsyncedData()
bool injected_error = false;
for (size_t i = 0; i < num_reqs; i++) {
if (!reqs[i].status.ok()) {
// Already seeing an error.
break;
}
bool this_injected_error;
reqs[i].status = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kRead, options, "",
FaultInjectionTestFS::ErrorOperation::kRead, &(reqs[i].result),
use_direct_io(), reqs[i].scratch,
/*need_count_increase=*/true,
/*fault_injected=*/&this_injected_error);
injected_error |= this_injected_error;
}
if (s.ok()) {
s = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kRead, options, "",
FaultInjectionTestFS::ErrorOperation::kMultiRead, nullptr,
use_direct_io(), nullptr, /*need_count_increase=*/!injected_error,
/*fault_injected=*/nullptr);
}
return s;
}
size_t TestFSRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
if (fs_->ShouldFailGetUniqueId()) {
return 0;
} else {
return target_->GetUniqueId(id, max_size);
}
}
void FaultInjectionTestFS::AddUnsyncedToRead(const std::string& fname,
size_t pos, size_t n,
Slice* result, char* scratch) {
// Should be checked prior
assert(result->size() < n);
size_t pos_after = pos + result->size();
MutexLock l(&mutex_);
auto it = db_file_state_.find(fname);
if (it != db_file_state_.end()) {
auto& st = it->second;
if (st.pos_at_last_append_ > static_cast<ssize_t>(pos_after)) {
size_t remaining_requested = n - result->size();
size_t to_copy =
std::min(remaining_requested,
static_cast<size_t>(st.pos_at_last_append_) - pos_after);
size_t buffer_offset = pos_after - static_cast<size_t>(std::max(
st.pos_at_last_sync_, ssize_t{0}));
// Data might have been dropped from buffer
if (st.buffer_.size() > buffer_offset) {
to_copy = std::min(to_copy, st.buffer_.size() - buffer_offset);
if (result->data() != scratch) {
// TODO: this will be needed when supporting random reads
// but not currently used
abort();
// NOTE: might overlap
// std::copy_n(result->data(), result->size(), scratch);
}
std::copy_n(st.buffer_.data() + buffer_offset, to_copy,
scratch + result->size());
*result = Slice(scratch, result->size() + to_copy);
}
}
}
}
IOStatus TestFSSequentialFile::Read(size_t n, const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) {
IOStatus s = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kRead, options, "",
FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
scratch, true /*need_count_increase=*/, nullptr /* fault_injected*/);
if (!s.ok()) {
return s;
}
s = target()->Read(n, options, result, scratch, dbg);
if (!s.ok()) {
return s;
}
if (fs_->ReadUnsyncedData() && result->size() < n) {
fs_->AddUnsyncedToRead(fname_, read_pos_, n, result, scratch);
}
read_pos_ += result->size();
return s;
}
IOStatus TestFSSequentialFile::PositionedRead(uint64_t offset, size_t n,
const IOOptions& options,
Slice* result, char* scratch,
IODebugContext* dbg) {
IOStatus s = fs_->MaybeInjectThreadLocalError(
FaultInjectionIOType::kRead, options, "",
FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
scratch, true /*need_count_increase=*/, nullptr /* fault_injected */);
if (!s.ok()) {
return s;
}
s = target()->PositionedRead(offset, n, options, result, scratch, dbg);
// TODO (low priority): fs_->ReadUnsyncedData()
return s;
}
IOStatus FaultInjectionTestFS::NewDirectory(
const std::string& name, const IOOptions& options,
std::unique_ptr<FSDirectory>* result, IODebugContext* dbg) {
std::unique_ptr<FSDirectory> r;
IOStatus io_s = target()->NewDirectory(name, options, &r, dbg);
if (!io_s.ok()) {
return io_s;
}
result->reset(
new TestFSDirectory(this, TestFSTrimDirname(name), r.release()));
return IOStatus::OK();
}
IOStatus FaultInjectionTestFS::FileExists(const std::string& fname,
const IOOptions& options,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s =
MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->FileExists(fname, options, dbg);
return io_s;
}
IOStatus FaultInjectionTestFS::GetChildren(const std::string& dir,
const IOOptions& options,
std::vector<std::string>* result,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s =
MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->GetChildren(dir, options, result, dbg);
return io_s;
}
IOStatus FaultInjectionTestFS::GetChildrenFileAttributes(
const std::string& dir, const IOOptions& options,
std::vector<FileAttributes>* result, IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s =
MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->GetChildrenFileAttributes(dir, options, result, dbg);
return io_s;
}
IOStatus FaultInjectionTestFS::NewWritableFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
if (IsFilesystemDirectWritable()) {
return target()->NewWritableFile(fname, file_opts, result, dbg);
}
IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
file_opts.io_options, fname);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->NewWritableFile(fname, file_opts, result, dbg);
if (io_s.ok()) {
result->reset(
new TestFSWritableFile(fname, file_opts, std::move(*result), this));
// WritableFileWriter* file is opened
// again then it will be truncated - so forget our saved state.
UntrackFile(fname);
{
MutexLock l(&mutex_);
open_managed_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
// The new file could overwrite an old one. Here we simplify
// the implementation by assuming no file of this name after
// dropping unsynced files.
list[dir_and_name.second] = kNewFileNoOverwrite;
}
}
return io_s;
}
IOStatus FaultInjectionTestFS::ReopenWritableFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSWritableFile>* result, IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
if (IsFilesystemDirectWritable()) {
return target()->ReopenWritableFile(fname, file_opts, result, dbg);
}
IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
file_opts.io_options, fname);
if (!io_s.ok()) {
return io_s;
}
bool exists;
IOStatus exists_s =
target()->FileExists(fname, IOOptions(), nullptr /* dbg */);
if (exists_s.IsNotFound()) {
exists = false;
} else if (exists_s.ok()) {
exists = true;
} else {
io_s = exists_s;
exists = false;
}
if (!io_s.ok()) {
return io_s;
}
io_s = target()->ReopenWritableFile(fname, file_opts, result, dbg);
// Only track files we created. Files created outside of this
// `FaultInjectionTestFS` are not eligible for tracking/data dropping
// (for example, they may contain data a previous db_stress run expects to
// be recovered). This could be extended to track/drop data appended once
// the file is under `FaultInjectionTestFS`'s control.
if (io_s.ok()) {
bool should_track;
{
MutexLock l(&mutex_);
if (db_file_state_.find(fname) != db_file_state_.end()) {
// It was written by this `FileSystem` earlier.
assert(exists);
should_track = true;
} else if (!exists) {
// It was created by this `FileSystem` just now.
should_track = true;
open_managed_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
list[dir_and_name.second] = kNewFileNoOverwrite;
} else {
should_track = false;
}
}
if (should_track) {
result->reset(
new TestFSWritableFile(fname, file_opts, std::move(*result), this));
}
}
return io_s;
}
IOStatus FaultInjectionTestFS::ReuseWritableFile(
const std::string& fname, const std::string& old_fname,
const FileOptions& file_opts, std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) {
IOStatus s = RenameFile(old_fname, fname, file_opts.io_options, dbg);
if (!s.ok()) {
return s;
}
return NewWritableFile(fname, file_opts, result, dbg);
}
IOStatus FaultInjectionTestFS::NewRandomRWFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSRandomRWFile>* result, IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
if (IsFilesystemDirectWritable()) {
return target()->NewRandomRWFile(fname, file_opts, result, dbg);
}
IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
file_opts.io_options, fname);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->NewRandomRWFile(fname, file_opts, result, dbg);
if (io_s.ok()) {
result->reset(new TestFSRandomRWFile(fname, std::move(*result), this));
// WritableFileWriter* file is opened
// again then it will be truncated - so forget our saved state.
UntrackFile(fname);
{
MutexLock l(&mutex_);
open_managed_files_.insert(fname);
auto dir_and_name = TestFSGetDirAndName(fname);
auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first];
// It could be overwriting an old file, but we simplify the
// implementation by ignoring it.
list[dir_and_name.second] = kNewFileNoOverwrite;
}
}
return io_s;
}
IOStatus FaultInjectionTestFS::NewRandomAccessFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSRandomAccessFile>* result, IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s = MaybeInjectThreadLocalError(
FaultInjectionIOType::kRead, file_opts.io_options, fname,
ErrorOperation::kOpen, nullptr /* result */, false /* direct_io */,
nullptr /* scratch */, true /*need_count_increase*/,
nullptr /*fault_injected*/);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
if (io_s.ok()) {
result->reset(new TestFSRandomAccessFile(fname, std::move(*result), this));
}
return io_s;
}
IOStatus FaultInjectionTestFS::NewSequentialFile(
const std::string& fname, const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* result, IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s = MaybeInjectThreadLocalError(
FaultInjectionIOType::kRead, file_opts.io_options, fname,
ErrorOperation::kOpen, nullptr /* result */, false /* direct_io */,
nullptr /* scratch */, true /*need_count_increase*/,
nullptr /*fault_injected*/);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->NewSequentialFile(fname, file_opts, result, dbg);
if (io_s.ok()) {
result->reset(new TestFSSequentialFile(std::move(*result), this, fname));
}
return io_s;
}
IOStatus FaultInjectionTestFS::DeleteFile(const std::string& f,
const IOOptions& options,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s = MaybeInjectThreadLocalError(
FaultInjectionIOType::kMetadataWrite, options);
if (!io_s.ok()) {
return io_s;
}
io_s = FileSystemWrapper::DeleteFile(f, options, dbg);
if (io_s.ok()) {
UntrackFile(f);
}
return io_s;
}
IOStatus FaultInjectionTestFS::GetFileSize(const std::string& f,
const IOOptions& options,
uint64_t* file_size,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s =
MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->GetFileSize(f, options, file_size, dbg);
if (!io_s.ok()) {
return io_s;
}
if (ReadUnsyncedData()) {
// Need to report flushed size, not synced size
MutexLock l(&mutex_);
auto it = db_file_state_.find(f);
if (it != db_file_state_.end()) {
*file_size = it->second.pos_at_last_append_;
}
}
return io_s;
}
IOStatus FaultInjectionTestFS::GetFileModificationTime(const std::string& fname,
const IOOptions& options,
uint64_t* file_mtime,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s =
MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->GetFileModificationTime(fname, options, file_mtime, dbg);
return io_s;
}
IOStatus FaultInjectionTestFS::RenameFile(const std::string& s,
const std::string& t,
const IOOptions& options,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s = MaybeInjectThreadLocalError(
FaultInjectionIOType::kMetadataWrite, options);
if (!io_s.ok()) {
return io_s;
}
// We preserve contents of overwritten files up to a size threshold.
// We could keep previous file in another name, but we need to worry about
// garbage collect the those files. We do it if it is needed later.
// We ignore I/O errors here for simplicity.
std::string previous_contents = kNewFileNoOverwrite;
if (target()->FileExists(t, IOOptions(), nullptr).ok()) {
uint64_t file_size;
if (target()->GetFileSize(t, IOOptions(), &file_size, nullptr).ok() &&
file_size < 1024) {
ReadFileToString(target(), t, &previous_contents).PermitUncheckedError();
}
}
io_s = FileSystemWrapper::RenameFile(s, t, options, dbg);
if (io_s.ok()) {
{
MutexLock l(&mutex_);
if (db_file_state_.find(s) != db_file_state_.end()) {
db_file_state_[t] = db_file_state_[s];
db_file_state_.erase(s);
}
auto sdn = TestFSGetDirAndName(s);
auto tdn = TestFSGetDirAndName(t);
if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) {
auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
tlist[tdn.second] = previous_contents;
}
}
}
return io_s;
}
IOStatus FaultInjectionTestFS::LinkFile(const std::string& s,
const std::string& t,
const IOOptions& options,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s = MaybeInjectThreadLocalError(
FaultInjectionIOType::kMetadataWrite, options);
if (!io_s.ok()) {
return io_s;
}
// Using the value in `dir_to_new_files_since_last_sync_` for the source file
// may be a more reasonable choice.
std::string previous_contents = kNewFileNoOverwrite;
io_s = FileSystemWrapper::LinkFile(s, t, options, dbg);
if (io_s.ok()) {
{
MutexLock l(&mutex_);
if (!allow_link_open_file_ &&
open_managed_files_.find(s) != open_managed_files_.end()) {
fprintf(stderr, "Attempt to LinkFile while open for write: %s\n",
s.c_str());
abort();
}
if (db_file_state_.find(s) != db_file_state_.end()) {
db_file_state_[t] = db_file_state_[s];
}
auto sdn = TestFSGetDirAndName(s);
auto tdn = TestFSGetDirAndName(t);
if (dir_to_new_files_since_last_sync_[sdn.first].find(sdn.second) !=
dir_to_new_files_since_last_sync_[sdn.first].end()) {
auto& tlist = dir_to_new_files_since_last_sync_[tdn.first];
assert(tlist.find(tdn.second) == tlist.end());
tlist[tdn.second] = previous_contents;
}
}
}
return io_s;
}
IOStatus FaultInjectionTestFS::NumFileLinks(const std::string& fname,
const IOOptions& options,
uint64_t* count,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s =
MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->NumFileLinks(fname, options, count, dbg);
return io_s;
}
IOStatus FaultInjectionTestFS::AreFilesSame(const std::string& first,
const std::string& second,
const IOOptions& options, bool* res,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s =
MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->AreFilesSame(first, second, options, res, dbg);
return io_s;
}
IOStatus FaultInjectionTestFS::GetAbsolutePath(const std::string& db_path,
const IOOptions& options,
std::string* output_path,
IODebugContext* dbg) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s =
MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->GetAbsolutePath(db_path, options, output_path, dbg);
return io_s;
}
IOStatus FaultInjectionTestFS::IsDirectory(const std::string& path,
const IOOptions& options,
bool* is_dir, IODebugContext* dgb) {
if (!IsFilesystemActive()) {
return GetError();
}
IOStatus io_s =
MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead, options);
if (!io_s.ok()) {
return io_s;
}
io_s = target()->IsDirectory(path, options, is_dir, dgb);
return io_s;
}
IOStatus FaultInjectionTestFS::Poll(std::vector<void*>& io_handles,
size_t min_completions) {
return target()->Poll(io_handles, min_completions);
}
IOStatus FaultInjectionTestFS::AbortIO(std::vector<void*>& io_handles) {
return target()->AbortIO(io_handles);
}
void FaultInjectionTestFS::WritableFileClosed(const FSFileState& state) {
MutexLock l(&mutex_);
if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
db_file_state_[state.filename_] = state;
open_managed_files_.erase(state.filename_);
}
}
void FaultInjectionTestFS::WritableFileSynced(const FSFileState& state) {
MutexLock l(&mutex_);
if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
db_file_state_.insert(std::make_pair(state.filename_, state));
} else {
db_file_state_[state.filename_] = state;
}
}
}
void FaultInjectionTestFS::WritableFileAppended(const FSFileState& state) {
MutexLock l(&mutex_);
if (open_managed_files_.find(state.filename_) != open_managed_files_.end()) {
if (db_file_state_.find(state.filename_) == db_file_state_.end()) {
db_file_state_.insert(std::make_pair(state.filename_, state));
} else {
db_file_state_[state.filename_] = state;
}
}
}
IOStatus FaultInjectionTestFS::DropUnsyncedFileData() {
IOStatus io_s;
MutexLock l(&mutex_);
for (std::map<std::string, FSFileState>::iterator it = db_file_state_.begin();
io_s.ok() && it != db_file_state_.end(); ++it) {
FSFileState& fs_state = it->second;
if (!fs_state.IsFullySynced()) {
io_s = fs_state.DropUnsyncedData();
}
}
return io_s;
}
IOStatus FaultInjectionTestFS::DropRandomUnsyncedFileData(Random* rnd) {
IOStatus io_s;
MutexLock l(&mutex_);
for (std::map<std::string, FSFileState>::iterator it = db_file_state_.begin();
io_s.ok() && it != db_file_state_.end(); ++it) {
FSFileState& fs_state = it->second;
if (!fs_state.IsFullySynced()) {
io_s = fs_state.DropRandomUnsyncedData(rnd);
}
}
return io_s;
}
IOStatus FaultInjectionTestFS::DeleteFilesCreatedAfterLastDirSync(
const IOOptions& options, IODebugContext* dbg) {
// Because DeleteFile access this container make a copy to avoid deadlock
std::map<std::string, std::map<std::string, std::string>> map_copy;
{
MutexLock l(&mutex_);
map_copy.insert(dir_to_new_files_since_last_sync_.begin(),
dir_to_new_files_since_last_sync_.end());
}
for (auto& pair : map_copy) {
for (auto& file_pair : pair.second) {
if (file_pair.second == kNewFileNoOverwrite) {
IOStatus io_s =
DeleteFile(pair.first + "/" + file_pair.first, options, dbg);
if (!io_s.ok()) {
return io_s;
}
} else {
IOOptions opts;
IOStatus io_s =
WriteStringToFile(target(), file_pair.second,
pair.first + "/" + file_pair.first, true, opts);
if (!io_s.ok()) {
return io_s;
}
}
}
}
return IOStatus::OK();
}
void FaultInjectionTestFS::ResetState() {
MutexLock l(&mutex_);
db_file_state_.clear();
dir_to_new_files_since_last_sync_.clear();
SetFilesystemActiveNoLock(true);
}
void FaultInjectionTestFS::UntrackFile(const std::string& f) {
MutexLock l(&mutex_);
auto dir_and_name = TestFSGetDirAndName(f);
dir_to_new_files_since_last_sync_[dir_and_name.first].erase(
dir_and_name.second);
db_file_state_.erase(f);
open_managed_files_.erase(f);
}
IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError(
const IOOptions& io_options, ErrorOperation op, Slice* result,
bool direct_io, char* scratch, bool need_count_increase,
bool* fault_injected) {
bool dummy_bool;
bool& ret_fault_injected = fault_injected ? *fault_injected : dummy_bool;
ret_fault_injected = false;
ErrorContext* ctx =
static_cast<ErrorContext*>(injected_thread_local_read_error_.Get());
if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
ShouldIOActivtiesExcludedFromFaultInjection(io_options.io_activity)) {
return IOStatus::OK();
}
IOStatus ret;
if (ctx->rand.OneIn(ctx->one_in)) {
if (ctx->count == 0) {
ctx->message = "";
}
if (need_count_increase) {
ctx->count++;
}
if (ctx->callstack) {
free(ctx->callstack);
}
ctx->callstack = port::SaveStack(&ctx->frames);
if (op != ErrorOperation::kMultiReadSingleReq) {
// Likely non-per read status code for MultiRead
ctx->message += "injected read error; ";
ret_fault_injected = true;
ret = IOStatus::IOError(ctx->message);
} else if (Random::GetTLSInstance()->OneIn(8)) {
assert(result);
// For a small chance, set the failure to status but turn the
// result to be empty, which is supposed to be caught for a check.
*result = Slice();
ctx->message += "injected empty result; ";
ret_fault_injected = true;
} else if (!direct_io && Random::GetTLSInstance()->OneIn(7) &&
scratch != nullptr && result->data() == scratch) {
assert(result);
// With direct I/O, many extra bytes might be read so corrupting
// one byte might not cause checksum mismatch. Skip checksum
// corruption injection.
// We only corrupt data if the result is filled to `scratch`. For other
// cases, the data might not be able to be modified (e.g mmaped files)
// or has unintended side effects.
// For a small chance, set the failure to status but corrupt the
// result in a way that checksum checking is supposed to fail.
// Corrupt the last byte, which is supposed to be a checksum byte
// It would work for CRC. Not 100% sure for xxhash and will adjust
// if it is not the case.
const_cast<char*>(result->data())[result->size() - 1]++;
ctx->message += "injected corrupt last byte; ";
ret_fault_injected = true;
} else {
ctx->message += "injected error result multiget single; ";
ret_fault_injected = true;
ret = IOStatus::IOError(ctx->message);
}
}
ret.SetRetryable(ctx->retryable);
ret.SetDataLoss(ctx->has_data_loss);
return ret;
}
bool FaultInjectionTestFS::TryParseFileName(const std::string& file_name,
uint64_t* number, FileType* type) {
std::size_t found = file_name.find_last_of('/');
std::string file = file_name.substr(found);
return ParseFileName(file, number, type);
}
IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalError(
FaultInjectionIOType type, const IOOptions& io_options,
const std::string& file_name, ErrorOperation op, Slice* result,
bool direct_io, char* scratch, bool need_count_increase,
bool* fault_injected) {
if (type == FaultInjectionIOType::kRead) {
return MaybeInjectThreadLocalReadError(io_options, op, result, direct_io,
scratch, need_count_increase,
fault_injected);
}
ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
ShouldIOActivtiesExcludedFromFaultInjection(io_options.io_activity) ||
(type == FaultInjectionIOType::kWrite &&
ShouldExcludeFromWriteFaultInjection(file_name))) {
return IOStatus::OK();
}
IOStatus ret;
if (ctx->rand.OneIn(ctx->one_in)) {
ctx->count++;
if (ctx->callstack) {
free(ctx->callstack);
}
ctx->callstack = port::SaveStack(&ctx->frames);
ctx->message = GetErrorMessageFromFaultInjectionIOType(type);
ret = IOStatus::IOError(ctx->message);
ret.SetRetryable(ctx->retryable);
ret.SetDataLoss(ctx->has_data_loss);
if (type == FaultInjectionIOType::kWrite) {
TEST_SYNC_POINT(
"FaultInjectionTestFS::InjectMetadataWriteError:Injected");
}
}
return ret;
}
void FaultInjectionTestFS::PrintInjectedThreadLocalErrorBacktrace(
FaultInjectionIOType type) {
#if defined(OS_LINUX)
ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
if (ctx) {
if (type == FaultInjectionIOType::kRead) {
fprintf(stderr, "Injected read error type = %d\n", ctx->type);
}
fprintf(stderr, "Message: %s\n", ctx->message.c_str());
port::PrintAndFreeStack(ctx->callstack, ctx->frames);
ctx->callstack = nullptr;
}
#else
(void)type;
#endif
}
} // namespace ROCKSDB_NAMESPACE