mirror of https://github.com/facebook/rocksdb.git
Decouple sync fault and write injection in FaultInjectionTestFS & fix tracing issue under WAL write error injection (#12797)
Summary:
**Context/Summary:**
After injecting write error to WAL, we started to see crash recovery verification failure in prefix recovery. That's because the current tracing implementation traces every write before it writes to WAL even when the WAL write can fail with write error injection. One consequence of that is the traced writes in trace files does not corresponding to write sequence sequence anymore e.g, it has more traced writes that the actual assigned sequence number to successful writes. Therefore b4a84efb4e/db_stress_tool/expected_state.cc (L674)
won't restore the ExpectedState to the correct sequence number we want.
Ideally, we should have a prepare-commit mechanism for tracing just like our ExpectedState so we can ignore the traced write if the write fails later. But for now, to simplify, we simply don't inject WAL error (and metadata write error cuz it could fail write when sync WAL dir fails)
To do so, we need to be able to exclude WAL from write injection but still allow sync fault injection in it to maintain its original sync fault testing coverage. This prompts us to decouple sync fault and write injection in FaultInjectionTestFS. And this is what this PR mainly about.
So now `FaultInjectionTestFS` works as the following:
- If direct_writable is true, then `FaultInjectionTestFS` is bypassed for writable file
- Otherwise, FaultInjectionTestFS` can buffer data for sync fault injection (if inject_unsynced_data_loss_ == true, global settings) and/or inject write error (if MaybeInjectThreadLocalError(), thread-local settings). WAL file can be optionally excluded from write injection
Bonus: better naming of relevant variables
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12797
Test Plan:
- The follow commands failed before this fix but passes after
```
python3 tools/db_crashtest.py --simple blackbox \
--interval=5 \
--preserve_unverified_changes=1 \
--threads=32 \
--disable_auto_compactions=1 \
--WAL_size_limit_MB=0 --WAL_ttl_seconds=0 --acquire_snapshot_one_in=0 --adaptive_readahead=0 --adm_policy=0 --advise_random_on_open=1 --allow_concurrent_memtable_write=0 --allow_data_in_errors=True --allow_fallocate=1 --async_io=0 --auto_readahead_size=0 --avoid_flush_during_recovery=1 --avoid_flush_during_shutdown=0 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=0 --batch_protection_bytes_per_key=0 --bgerror_resume_retry_interval=1000000 --block_align=0 --block_protection_bytes_per_key=4 --block_size=16384 --bloom_before_level=2147483646 --bloom_bits=3.2003682301518492 --bottommost_compression_type=zlib --bottommost_file_compaction_delay=600 --bytes_per_sync=0 --cache_index_and_filter_blocks=1 --cache_index_and_filter_blocks_with_high_priority=1 --cache_size=33554432 --cache_type=fixed_hyper_clock_cache --charge_compression_dictionary_building_buffer=0 --charge_file_metadata=0 --charge_filter_construction=0 --charge_table_reader=1 --check_multiget_consistency=0 --check_multiget_entity_consistency=0 --checkpoint_one_in=0 --checksum_type=kxxHash64 --clear_column_family_one_in=0 --column_families=1 --compact_files_one_in=0 --compact_range_one_in=0 --compaction_pri=2 --compaction_readahead_size=0 --compaction_ttl=0 --compress_format_version=1 --compressed_secondary_cache_size=16777216 --compression_checksum=1 --compression_max_dict_buffer_bytes=549755813887 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=none --compression_use_zstd_dict_trainer=1 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --daily_offpeak_time_utc=00:00-23:59 --data_block_index_type=0 \
--db_write_buffer_size=0 --delete_obsolete_files_period_micros=0 --delpercent=0 --delrangepercent=0 --destroy_db_initially=0 --detect_filter_construct_corruption=0 --disable_file_deletions_one_in=0 --disable_manual_compaction_one_in=0 --disable_wal=0 --dump_malloc_stats=0 --enable_checksum_handoff=0 --enable_compaction_filter=0 --enable_custom_split_merge=0 --enable_do_not_compress_roles=1 --enable_index_compression=0 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=0 --enable_sst_partitioner_factory=0 --enable_thread_tracking=0 --enable_write_thread_adaptive_yield=0 --error_recovery_with_no_fault_injection=0 --fail_if_options_file_error=0 --fifo_allow_compaction=1 --file_checksum_impl=xxh64 --fill_cache=0 --flush_one_in=100 --format_version=4 --get_all_column_family_metadata_one_in=0 --get_current_wal_file_one_in=0 --get_live_files_apis_one_in=0 --get_properties_of_all_tables_one_in=0 --get_property_one_in=0 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=274877906944 --high_pri_pool_ratio=0.5 --index_block_restart_interval=9 --index_shortening=1 --index_type=0 --ingest_external_file_one_in=0 --initial_auto_readahead_size=0 --inplace_update_support=0 --iterpercent=0 --key_len_percent_dist=1,30,69 --key_may_exist_one_in=0 --last_level_temperature=kUnknown --level_compaction_dynamic_level_bytes=1 --lock_wal_one_in=0 --log2_keys_per_lock=10 --log_file_time_to_roll=0 --log_readahead_size=16777216 --long_running_snapshots=0 --low_pri_pool_ratio=0 --lowest_used_cache_tier=2 --manifest_preallocation_size=0 --manual_wal_flush_one_in=0 --mark_for_compaction_one_file_in=0 --max_auto_readahead_size=524288 --max_background_compactions=1 --max_bytes_for_level_base=67108864 --max_key=1000 --max_key_len=3 --memtable_insert_hint_per_batch=0 --memtable_max_range_deletions=0 --memtable_prefix_bloom_size_ratio=0.5 --memtable_protection_bytes_per_key=8 --memtable_whole_key_filtering=0 --memtablerep=skip_list --metadata_charge_policy=0 --metadata_read_fault_one_in=0 --metadata_write_fault_one_in=0 --min_write_buffer_number_to_merge=1 --mmap_read=0 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --open_files=-1 --open_metadata_read_fault_one_in=0 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=20000000 \
--optimize_filters_for_hits=1 --optimize_filters_for_memory=1 --optimize_multiget_for_io=0 --paranoid_file_checks=1 --partition_filters=0 --partition_pinning=3 --pause_background_one_in=0 --periodic_compaction_seconds=0 --prefix_size=1 --prefixpercent=0 --prepopulate_block_cache=0 --preserve_internal_time_seconds=0 --progress_reports=0 --promote_l0_one_in=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=0 --readahead_size=0 --readpercent=0 --recycle_log_file_num=0 --reopen=0 --report_bg_io_stats=0 --reset_stats_one_in=1000000 --sample_for_compression=5 --secondary_cache_fault_one_in=0 --secondary_cache_uri= --skip_stats_update_on_db_open=0 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=68719476736 --sqfc_name=bar --sqfc_version=1 --sst_file_manager_bytes_per_sec=0 --sst_file_manager_bytes_per_truncate=0 --stats_dump_period_sec=10 --stats_history_buffer_size=0 --strict_bytes_per_sync=0 --subcompactions=1 --sync=0 --sync_fault_injection=1 --table_cache_numshardbits=0 --target_file_size_base=16777216 --target_file_size_multiplier=1 --test_batches_snapshots=0 --top_level_index_pinning=3 --uncache_aggressiveness=9890 --universal_max_read_amp=-1 --unpartitioned_pinning=3 --use_adaptive_mutex=0 --use_adaptive_mutex_lru=1 --use_attribute_group=0 --use_delta_encoding=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=0 --use_multi_cf_iterator=0 --use_multi_get_entity=0 --use_multiget=0 --use_put_entity_one_in=0 --use_sqfc_for_range_queries=0 --use_timed_put_one_in=0 --use_write_buffer_manager=0 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=0 --verify_checksum_one_in=0 --verify_compression=1 --verify_db_one_in=0 --verify_file_checksums_one_in=0 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=zstd --write_buffer_size=335544320 --write_dbid_to_manifest=1 --write_fault_one_in=100 --writepercent=100
```
- CI
Reviewed By: cbi42
Differential Revision: D58917145
Pulled By: hx235
fbshipit-source-id: b6397036bea035a92341c2b05fb01872db2153d7
This commit is contained in:
parent
41c6b4549a
commit
0d93c8a6ca
|
@ -159,13 +159,13 @@ bool RunStressTestImpl(SharedState* shared) {
|
|||
stress->TrackExpectedState(shared);
|
||||
}
|
||||
|
||||
// Since wrie fault and sync fault implementations are coupled with each
|
||||
// other in `TestFSWritableFile()`, we can not enable or disable only one
|
||||
// of the two.
|
||||
// TODO(hx235): decouple implementations of write fault injection and sync
|
||||
// fault injection.
|
||||
if (FLAGS_sync_fault_injection || FLAGS_write_fault_one_in > 0) {
|
||||
fault_fs_guard->SetFilesystemDirectWritable(false);
|
||||
fault_fs_guard->SetInjectUnsyncedDataLoss(FLAGS_sync_fault_injection);
|
||||
if (FLAGS_exclude_wal_from_write_fault_injection) {
|
||||
fault_fs_guard->SetFileTypesExcludedFromWriteFaultInjection(
|
||||
{FileType::kWalFile});
|
||||
}
|
||||
}
|
||||
now = clock->NowMicros();
|
||||
fprintf(stdout, "%s Starting database operations\n",
|
||||
|
|
|
@ -1095,6 +1095,9 @@ DEFINE_int32(write_fault_one_in, 0,
|
|||
"On non-zero, enables fault injection on write. Currently only"
|
||||
"injects write error when writing to SST files.");
|
||||
|
||||
DEFINE_bool(exclude_wal_from_write_fault_injection, false,
|
||||
"If true, we won't inject write fault when writing to WAL file");
|
||||
|
||||
DEFINE_int32(metadata_write_fault_one_in, 1000,
|
||||
"On non-zero, enables fault injection on metadata write (i.e, "
|
||||
"directory and file metadata write)");
|
||||
|
|
|
@ -284,7 +284,7 @@ class DbStressListener : public EventListener {
|
|||
FaultInjectionIOType::kMetadataWrite);
|
||||
// TODO(hx235): only exempt the flush thread during error recovery instead
|
||||
// of all the flush threads from error injection
|
||||
fault_fs_guard->SetIOActivtiesExemptedFromFaultInjection(
|
||||
fault_fs_guard->SetIOActivtiesExcludedFromFaultInjection(
|
||||
{Env::IOActivity::kFlush});
|
||||
}
|
||||
}
|
||||
|
@ -300,7 +300,7 @@ class DbStressListener : public EventListener {
|
|||
FaultInjectionIOType::kMetadataRead);
|
||||
fault_fs_guard->EnableThreadLocalErrorInjection(
|
||||
FaultInjectionIOType::kMetadataWrite);
|
||||
fault_fs_guard->SetIOActivtiesExemptedFromFaultInjection({});
|
||||
fault_fs_guard->SetIOActivtiesExcludedFromFaultInjection({});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ DECLARE_int32(metadata_read_fault_one_in);
|
|||
DECLARE_int32(metadata_write_fault_one_in);
|
||||
DECLARE_int32(read_fault_one_in);
|
||||
DECLARE_int32(write_fault_one_in);
|
||||
DECLARE_bool(exclude_wal_from_write_fault_injection);
|
||||
DECLARE_int32(open_metadata_read_fault_one_in);
|
||||
DECLARE_int32(open_metadata_write_fault_one_in);
|
||||
DECLARE_int32(open_write_fault_one_in);
|
||||
|
|
|
@ -436,7 +436,7 @@ void StressTest::FinishInitDb(SharedState* shared) {
|
|||
void StressTest::TrackExpectedState(SharedState* shared) {
|
||||
// When data loss is simulated, recovery from potential data loss is a prefix
|
||||
// recovery that requires tracing
|
||||
if (MightHaveDataLoss() && IsStateTracked()) {
|
||||
if (MightHaveUnsyncedDataLoss() && IsStateTracked()) {
|
||||
Status s = shared->SaveAtAndAfter(db_);
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "Error enabling history tracing: %s\n",
|
||||
|
@ -3475,17 +3475,23 @@ void StressTest::Open(SharedState* shared, bool reopen) {
|
|||
}
|
||||
// TODO; test transaction DB Open with fault injection
|
||||
if (!FLAGS_use_txn) {
|
||||
bool inject_sync_fault = FLAGS_sync_fault_injection;
|
||||
bool inject_open_meta_read_error =
|
||||
FLAGS_open_metadata_read_fault_one_in > 0;
|
||||
bool inject_open_meta_write_error =
|
||||
FLAGS_open_metadata_write_fault_one_in > 0;
|
||||
bool inject_open_read_error = FLAGS_open_read_fault_one_in > 0;
|
||||
bool inject_open_write_error = FLAGS_open_write_fault_one_in > 0;
|
||||
if ((inject_open_meta_read_error || inject_open_meta_write_error ||
|
||||
inject_open_read_error || inject_open_write_error) &&
|
||||
if ((inject_sync_fault || inject_open_meta_read_error ||
|
||||
inject_open_meta_write_error || inject_open_read_error ||
|
||||
inject_open_write_error) &&
|
||||
fault_fs_guard
|
||||
->FileExists(FLAGS_db + "/CURRENT", IOOptions(), nullptr)
|
||||
.ok()) {
|
||||
if (inject_sync_fault || inject_open_write_error) {
|
||||
fault_fs_guard->SetFilesystemDirectWritable(false);
|
||||
fault_fs_guard->SetInjectUnsyncedDataLoss(inject_sync_fault);
|
||||
}
|
||||
fault_fs_guard->SetThreadLocalErrorContext(
|
||||
FaultInjectionIOType::kMetadataRead,
|
||||
static_cast<uint32_t>(FLAGS_seed),
|
||||
|
@ -3509,7 +3515,6 @@ void StressTest::Open(SharedState* shared, bool reopen) {
|
|||
fault_fs_guard->EnableThreadLocalErrorInjection(
|
||||
FaultInjectionIOType::kRead);
|
||||
|
||||
fault_fs_guard->SetFilesystemDirectWritable(false);
|
||||
fault_fs_guard->SetThreadLocalErrorContext(
|
||||
FaultInjectionIOType::kWrite, static_cast<uint32_t>(FLAGS_seed),
|
||||
FLAGS_open_write_fault_one_in, false /* retryable */,
|
||||
|
@ -3544,11 +3549,12 @@ void StressTest::Open(SharedState* shared, bool reopen) {
|
|||
}
|
||||
}
|
||||
|
||||
if (inject_open_meta_read_error || inject_open_meta_write_error ||
|
||||
inject_open_read_error || inject_open_write_error) {
|
||||
if (inject_sync_fault || inject_open_meta_read_error ||
|
||||
inject_open_meta_write_error || inject_open_read_error ||
|
||||
inject_open_write_error) {
|
||||
fault_fs_guard->SetInjectUnsyncedDataLoss(false);
|
||||
fault_fs_guard->DisableThreadLocalErrorInjection(
|
||||
FaultInjectionIOType::kRead);
|
||||
fault_fs_guard->SetFilesystemDirectWritable(true);
|
||||
fault_fs_guard->DisableThreadLocalErrorInjection(
|
||||
FaultInjectionIOType::kWrite);
|
||||
fault_fs_guard->DisableThreadLocalErrorInjection(
|
||||
|
@ -3571,9 +3577,10 @@ void StressTest::Open(SharedState* shared, bool reopen) {
|
|||
}
|
||||
}
|
||||
if (!s.ok()) {
|
||||
// After failure to opening a DB due to IO error, retry should
|
||||
// successfully open the DB with correct data if no IO error shows
|
||||
// up.
|
||||
// After failure to opening a DB due to IO error or unsynced data
|
||||
// loss, retry should successfully open the DB with correct data if
|
||||
// no IO error shows up.
|
||||
inject_sync_fault = false;
|
||||
inject_open_meta_read_error = false;
|
||||
inject_open_meta_write_error = false;
|
||||
inject_open_read_error = false;
|
||||
|
@ -3770,7 +3777,7 @@ void StressTest::Reopen(ThreadState* thread) {
|
|||
clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
|
||||
Open(thread->shared, /*reopen=*/true);
|
||||
|
||||
if (thread->shared->GetStressTest()->MightHaveDataLoss() &&
|
||||
if (thread->shared->GetStressTest()->MightHaveUnsyncedDataLoss() &&
|
||||
IsStateTracked()) {
|
||||
Status s = thread->shared->SaveAtAndAfter(db_);
|
||||
if (!s.ok()) {
|
||||
|
|
|
@ -44,9 +44,8 @@ class StressTest {
|
|||
virtual void VerifyDb(ThreadState* thread) const = 0;
|
||||
virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const = 0;
|
||||
void PrintStatistics();
|
||||
bool MightHaveDataLoss() {
|
||||
return FLAGS_sync_fault_injection || FLAGS_write_fault_one_in > 0 ||
|
||||
FLAGS_metadata_write_fault_one_in > 0 || FLAGS_disable_wal ||
|
||||
bool MightHaveUnsyncedDataLoss() {
|
||||
return FLAGS_sync_fault_injection || FLAGS_disable_wal ||
|
||||
FLAGS_manual_wal_flush_one_in > 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -90,10 +90,10 @@ int db_stress_tool(int argc, char** argv) {
|
|||
FaultInjectionTestFS* fs =
|
||||
new FaultInjectionTestFS(raw_env->GetFileSystem());
|
||||
fault_fs_guard.reset(fs);
|
||||
// Set it to direct writable here to not lose files created during DB open
|
||||
// when no open fault injection is not enabled.
|
||||
// This will be overwritten in StressTest::Open() for open fault injection
|
||||
// and in RunStressTestImpl() for proper write fault injection setup.
|
||||
// Set it to direct writable here to initially bypass any fault injection
|
||||
// during DB open This will correspondingly be overwritten in
|
||||
// StressTest::Open() for open fault injection and in RunStressTestImpl()
|
||||
// for proper fault injection setup.
|
||||
fault_fs_guard->SetFilesystemDirectWritable(true);
|
||||
fault_env_guard =
|
||||
std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard);
|
||||
|
|
|
@ -222,6 +222,7 @@ default_params = {
|
|||
"metadata_write_fault_one_in": lambda: random.choice([0, 128, 1000]),
|
||||
"read_fault_one_in": lambda: random.choice([0, 32, 1000]),
|
||||
"write_fault_one_in": lambda: random.choice([0, 128, 1000]),
|
||||
"exclude_wal_from_write_fault_injection": 0,
|
||||
"open_metadata_write_fault_one_in": lambda: random.choice([0, 0, 8]),
|
||||
"open_metadata_read_fault_one_in": lambda: random.choice([0, 0, 8]),
|
||||
"open_write_fault_one_in": lambda: random.choice([0, 0, 16]),
|
||||
|
@ -622,11 +623,11 @@ multiops_txn_default_params = {
|
|||
"enable_compaction_filter": 0,
|
||||
"create_timestamped_snapshot_one_in": 50,
|
||||
"sync_fault_injection": 0,
|
||||
"metadata_write_fault_one_in": 0,
|
||||
"manual_wal_flush": 0,
|
||||
# This test has aggressive flush frequency and small write buffer size.
|
||||
# Disabling write fault to avoid writes being stopped.
|
||||
"write_fault_one_in": 0,
|
||||
"metadata_write_fault_one_in": 0,
|
||||
# PutEntity in transactions is not yet implemented
|
||||
"use_put_entity_one_in": 0,
|
||||
"use_get_entity": 0,
|
||||
|
@ -736,14 +737,10 @@ def finalize_and_sanitize(src_params):
|
|||
# logic for unsynced data loss relies on max sequence number stored
|
||||
# in MANIFEST, so they don't work together.
|
||||
dest_params["sync_fault_injection"] = 0
|
||||
dest_params["write_fault_one_in"] = 0
|
||||
dest_params["metadata_write_fault_one_in"] = 0
|
||||
dest_params["disable_wal"] = 0
|
||||
dest_params["manual_wal_flush_one_in"] = 0
|
||||
if (
|
||||
dest_params.get("sync_fault_injection") == 1
|
||||
or dest_params.get("write_fault_one_in") > 0
|
||||
or dest_params.get("metadata_write_fault_one_in") > 0
|
||||
or dest_params.get("disable_wal") == 1
|
||||
or dest_params.get("manual_wal_flush_one_in") > 0
|
||||
):
|
||||
|
@ -759,6 +756,12 @@ def finalize_and_sanitize(src_params):
|
|||
# files, which would be problematic when unsynced data can be lost in
|
||||
# crash recoveries.
|
||||
dest_params["enable_compaction_filter"] = 0
|
||||
# Prefix-recoverability relies on tracing successful user writes.
|
||||
# Currently we trace all user writes regardless of whether it later succeeds or not.
|
||||
# To simplify, we disable any user write failure injection.
|
||||
# TODO(hx235): support tracing user writes with failure injection.
|
||||
dest_params["metadata_write_fault_one_in"] = 0
|
||||
dest_params["exclude_wal_from_write_fault_injection"] = 1
|
||||
# Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb
|
||||
# unordered_write is only enabled with --txn, and txn_params disables inplace_update_support, so
|
||||
# setting allow_concurrent_memtable_write=1 won't conflcit with inplace_update_support.
|
||||
|
@ -813,8 +816,6 @@ def finalize_and_sanitize(src_params):
|
|||
# compatible with only write committed policy
|
||||
if dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy", 0) != 0:
|
||||
dest_params["sync_fault_injection"] = 0
|
||||
dest_params["write_fault_one_in"] = 0
|
||||
dest_params["metadata_write_fault_one_in"] = 0
|
||||
dest_params["disable_wal"] = 0
|
||||
dest_params["manual_wal_flush_one_in"] = 0
|
||||
# Wide-column pessimistic transaction APIs are initially supported for
|
||||
|
|
|
@ -166,13 +166,13 @@ IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions& options,
|
|||
return fs_->GetError();
|
||||
}
|
||||
|
||||
IOStatus s =
|
||||
fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite, options);
|
||||
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
|
||||
options, state_.filename_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
||||
if (target_->use_direct_io()) {
|
||||
if (target_->use_direct_io() || !fs_->InjectUnsyncedDataLoss()) {
|
||||
// TODO(hx235): buffer data for direct IO write to simulate data loss like
|
||||
// non-direct IO write
|
||||
s = target_->Append(data, options, dbg);
|
||||
|
@ -201,8 +201,8 @@ IOStatus TestFSWritableFile::Append(
|
|||
return IOStatus::Corruption("Data is corrupted!");
|
||||
}
|
||||
|
||||
IOStatus s =
|
||||
fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite, options);
|
||||
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
|
||||
options, state_.filename_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -220,7 +220,7 @@ IOStatus TestFSWritableFile::Append(
|
|||
return IOStatus::Corruption(msg);
|
||||
}
|
||||
|
||||
if (target_->use_direct_io()) {
|
||||
if (target_->use_direct_io() || !fs_->InjectUnsyncedDataLoss()) {
|
||||
// TODO(hx235): buffer data for direct IO write to simulate data loss like
|
||||
// non-direct IO write
|
||||
s = target_->Append(data, options, dbg);
|
||||
|
@ -240,8 +240,8 @@ IOStatus TestFSWritableFile::Truncate(uint64_t size, const IOOptions& options,
|
|||
if (!fs_->IsFilesystemActive()) {
|
||||
return fs_->GetError();
|
||||
}
|
||||
IOStatus s =
|
||||
fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite, options);
|
||||
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
|
||||
options, state_.filename_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -264,8 +264,8 @@ IOStatus TestFSWritableFile::PositionedAppend(const Slice& data,
|
|||
if (fs_->ShouldDataCorruptionBeforeWrite()) {
|
||||
return IOStatus::Corruption("Data is corrupted!");
|
||||
}
|
||||
IOStatus s =
|
||||
fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite, options);
|
||||
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
|
||||
options, state_.filename_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -290,8 +290,8 @@ IOStatus TestFSWritableFile::PositionedAppend(
|
|||
if (fs_->ShouldDataCorruptionBeforeWrite()) {
|
||||
return IOStatus::Corruption("Data is corrupted!");
|
||||
}
|
||||
IOStatus s =
|
||||
fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite, options);
|
||||
IOStatus s = fs_->MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
|
||||
options, state_.filename_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
|
@ -468,7 +468,7 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
|
|||
return fs_->GetError();
|
||||
}
|
||||
IOStatus s = fs_->MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kRead, options,
|
||||
FaultInjectionIOType::kRead, options, "",
|
||||
FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
|
||||
scratch, /*need_count_increase=*/true,
|
||||
/*fault_injected=*/nullptr);
|
||||
|
@ -493,7 +493,7 @@ IOStatus TestFSRandomAccessFile::ReadAsync(
|
|||
}
|
||||
if (res_status.ok()) {
|
||||
res_status = fs_->MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kRead, opts,
|
||||
FaultInjectionIOType::kRead, opts, "",
|
||||
FaultInjectionTestFS::ErrorOperation::kRead, &res.result,
|
||||
use_direct_io(), req.scratch, /*need_count_increase=*/true,
|
||||
/*fault_injected=*/nullptr);
|
||||
|
@ -533,7 +533,7 @@ IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
|
|||
}
|
||||
bool this_injected_error;
|
||||
reqs[i].status = fs_->MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kRead, options,
|
||||
FaultInjectionIOType::kRead, options, "",
|
||||
FaultInjectionTestFS::ErrorOperation::kRead, &(reqs[i].result),
|
||||
use_direct_io(), reqs[i].scratch,
|
||||
/*need_count_increase=*/true,
|
||||
|
@ -542,7 +542,7 @@ IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
|
|||
}
|
||||
if (s.ok()) {
|
||||
s = fs_->MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kRead, options,
|
||||
FaultInjectionIOType::kRead, options, "",
|
||||
FaultInjectionTestFS::ErrorOperation::kMultiRead, nullptr,
|
||||
use_direct_io(), nullptr, /*need_count_increase=*/!injected_error,
|
||||
/*fault_injected=*/nullptr);
|
||||
|
@ -598,7 +598,7 @@ IOStatus TestFSSequentialFile::Read(size_t n, const IOOptions& options,
|
|||
Slice* result, char* scratch,
|
||||
IODebugContext* dbg) {
|
||||
IOStatus s = fs_->MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kRead, options,
|
||||
FaultInjectionIOType::kRead, options, "",
|
||||
FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
|
||||
scratch, true /*need_count_increase=*/, nullptr /* fault_injected*/);
|
||||
if (!s.ok()) {
|
||||
|
@ -623,7 +623,7 @@ IOStatus TestFSSequentialFile::PositionedRead(uint64_t offset, size_t n,
|
|||
Slice* result, char* scratch,
|
||||
IODebugContext* dbg) {
|
||||
IOStatus s = fs_->MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kRead, options,
|
||||
FaultInjectionIOType::kRead, options, "",
|
||||
FaultInjectionTestFS::ErrorOperation::kRead, result, use_direct_io(),
|
||||
scratch, true /*need_count_increase=*/, nullptr /* fault_injected */);
|
||||
if (!s.ok()) {
|
||||
|
@ -707,12 +707,12 @@ IOStatus FaultInjectionTestFS::NewWritableFile(
|
|||
return GetError();
|
||||
}
|
||||
|
||||
if (ShouldUseDiretWritable(fname)) {
|
||||
if (IsFilesystemDirectWritable()) {
|
||||
return target()->NewWritableFile(fname, file_opts, result, dbg);
|
||||
}
|
||||
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kMetadataWrite, file_opts.io_options);
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
|
||||
file_opts.io_options, fname);
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
}
|
||||
|
@ -744,11 +744,11 @@ IOStatus FaultInjectionTestFS::ReopenWritableFile(
|
|||
if (!IsFilesystemActive()) {
|
||||
return GetError();
|
||||
}
|
||||
if (ShouldUseDiretWritable(fname)) {
|
||||
if (IsFilesystemDirectWritable()) {
|
||||
return target()->ReopenWritableFile(fname, file_opts, result, dbg);
|
||||
}
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kMetadataWrite, file_opts.io_options);
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
|
||||
file_opts.io_options, fname);
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
}
|
||||
|
@ -820,11 +820,11 @@ IOStatus FaultInjectionTestFS::NewRandomRWFile(
|
|||
if (!IsFilesystemActive()) {
|
||||
return GetError();
|
||||
}
|
||||
if (ShouldUseDiretWritable(fname)) {
|
||||
if (IsFilesystemDirectWritable()) {
|
||||
return target()->NewRandomRWFile(fname, file_opts, result, dbg);
|
||||
}
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kMetadataWrite, file_opts.io_options);
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kWrite,
|
||||
file_opts.io_options, fname);
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
}
|
||||
|
@ -855,8 +855,11 @@ IOStatus FaultInjectionTestFS::NewRandomAccessFile(
|
|||
if (!IsFilesystemActive()) {
|
||||
return GetError();
|
||||
}
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kRead,
|
||||
file_opts.io_options);
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kRead, file_opts.io_options, fname,
|
||||
ErrorOperation::kOpen, nullptr /* result */, false /* direct_io */,
|
||||
nullptr /* scratch */, true /*need_count_increase*/,
|
||||
nullptr /*fault_injected*/);
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
}
|
||||
|
@ -875,8 +878,11 @@ IOStatus FaultInjectionTestFS::NewSequentialFile(
|
|||
if (!IsFilesystemActive()) {
|
||||
return GetError();
|
||||
}
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kRead,
|
||||
file_opts.io_options);
|
||||
IOStatus io_s = MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType::kRead, file_opts.io_options, fname,
|
||||
ErrorOperation::kOpen, nullptr /* result */, false /* direct_io */,
|
||||
nullptr /* scratch */, true /*need_count_increase*/,
|
||||
nullptr /*fault_injected*/);
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
}
|
||||
|
@ -1237,7 +1243,7 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError(
|
|||
ErrorContext* ctx =
|
||||
static_cast<ErrorContext*>(injected_thread_local_read_error_.Get());
|
||||
if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
|
||||
ShouldIOActivtiesExemptFromFaultInjection(io_options.io_activity)) {
|
||||
ShouldIOActivtiesExcludedFromFaultInjection(io_options.io_activity)) {
|
||||
return IOStatus::OK();
|
||||
}
|
||||
|
||||
|
@ -1303,8 +1309,9 @@ bool FaultInjectionTestFS::TryParseFileName(const std::string& file_name,
|
|||
}
|
||||
|
||||
IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType type, const IOOptions& io_options, ErrorOperation op,
|
||||
Slice* result, bool direct_io, char* scratch, bool need_count_increase,
|
||||
FaultInjectionIOType type, const IOOptions& io_options,
|
||||
const std::string& file_name, ErrorOperation op, Slice* result,
|
||||
bool direct_io, char* scratch, bool need_count_increase,
|
||||
bool* fault_injected) {
|
||||
if (type == FaultInjectionIOType::kRead) {
|
||||
return MaybeInjectThreadLocalReadError(io_options, op, result, direct_io,
|
||||
|
@ -1314,7 +1321,9 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalError(
|
|||
|
||||
ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
|
||||
if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
|
||||
ShouldIOActivtiesExemptFromFaultInjection(io_options.io_activity)) {
|
||||
ShouldIOActivtiesExcludedFromFaultInjection(io_options.io_activity) ||
|
||||
(type == FaultInjectionIOType::kWrite &&
|
||||
ShouldExcludeFromWriteFaultInjection(file_name))) {
|
||||
return IOStatus::OK();
|
||||
}
|
||||
|
||||
|
|
|
@ -207,6 +207,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
: FileSystemWrapper(base),
|
||||
filesystem_active_(true),
|
||||
filesystem_writable_(false),
|
||||
inject_unsynced_data_loss_(false),
|
||||
read_unsynced_data_(true),
|
||||
allow_link_open_file_(false),
|
||||
injected_thread_local_read_error_(DeleteThreadLocalErrorContext),
|
||||
|
@ -359,19 +360,6 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
MutexLock l(&mutex_);
|
||||
return filesystem_writable_;
|
||||
}
|
||||
bool ShouldUseDiretWritable(const std::string& file_name) {
|
||||
MutexLock l(&mutex_);
|
||||
if (filesystem_writable_) {
|
||||
return true;
|
||||
}
|
||||
FileType file_type = kTempFile;
|
||||
uint64_t file_number = 0;
|
||||
if (!TryParseFileName(file_name, &file_number, &file_type)) {
|
||||
return false;
|
||||
}
|
||||
return direct_writable_types_.find(file_type) !=
|
||||
direct_writable_types_.end();
|
||||
}
|
||||
void SetFilesystemActiveNoLock(
|
||||
bool active, IOStatus error = IOStatus::Corruption("Not active")) {
|
||||
error.PermitUncheckedError();
|
||||
|
@ -391,6 +379,18 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
filesystem_writable_ = writable;
|
||||
}
|
||||
|
||||
// If true, we buffer write data in memory to simulate data loss upon system
|
||||
// crash by only having process crashes
|
||||
void SetInjectUnsyncedDataLoss(bool inject) {
|
||||
MutexLock l(&mutex_);
|
||||
inject_unsynced_data_loss_ = inject;
|
||||
}
|
||||
|
||||
bool InjectUnsyncedDataLoss() {
|
||||
MutexLock l(&mutex_);
|
||||
return inject_unsynced_data_loss_;
|
||||
}
|
||||
|
||||
// In places (e.g. GetSortedWals()) RocksDB relies on querying the file size
|
||||
// or even reading the contents of files currently open for writing, and
|
||||
// as in POSIX semantics, expects to see the flushed size and contents
|
||||
|
@ -414,21 +414,10 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
allow_link_open_file_ = allow_link_open_file;
|
||||
}
|
||||
|
||||
void SetDirectWritableTypes(const std::set<FileType>& types) {
|
||||
bool ShouldIOActivtiesExcludedFromFaultInjection(Env::IOActivity io_activty) {
|
||||
MutexLock l(&mutex_);
|
||||
direct_writable_types_ = types;
|
||||
}
|
||||
|
||||
void SetIOActivtiesExemptedFromFaultInjection(
|
||||
const std::set<Env::IOActivity>& io_activties) {
|
||||
MutexLock l(&mutex_);
|
||||
io_activties_exempted_from_fault_injection = io_activties;
|
||||
}
|
||||
|
||||
bool ShouldIOActivtiesExemptFromFaultInjection(Env::IOActivity io_activty) {
|
||||
MutexLock l(&mutex_);
|
||||
return io_activties_exempted_from_fault_injection.find(io_activty) !=
|
||||
io_activties_exempted_from_fault_injection.end();
|
||||
return io_activties_excluded_from_fault_injection.find(io_activty) !=
|
||||
io_activties_excluded_from_fault_injection.end();
|
||||
}
|
||||
|
||||
void AssertNoOpenFile() { assert(open_managed_files_.empty()); }
|
||||
|
@ -505,8 +494,8 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
|
||||
IOStatus MaybeInjectThreadLocalError(
|
||||
FaultInjectionIOType type, const IOOptions& io_options,
|
||||
ErrorOperation op = kUnknown, Slice* slice = nullptr,
|
||||
bool direct_io = false, char* scratch = nullptr,
|
||||
const std::string& file_name = "", ErrorOperation op = kUnknown,
|
||||
Slice* slice = nullptr, bool direct_io = false, char* scratch = nullptr,
|
||||
bool need_count_increase = false, bool* fault_injected = nullptr);
|
||||
|
||||
int GetAndResetInjectedThreadLocalErrorCount(FaultInjectionIOType type) {
|
||||
|
@ -519,6 +508,29 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
return count;
|
||||
}
|
||||
|
||||
void SetIOActivtiesExcludedFromFaultInjection(
|
||||
const std::set<Env::IOActivity>& io_activties) {
|
||||
MutexLock l(&mutex_);
|
||||
io_activties_excluded_from_fault_injection = io_activties;
|
||||
}
|
||||
|
||||
void SetFileTypesExcludedFromWriteFaultInjection(
|
||||
const std::set<FileType>& types) {
|
||||
MutexLock l(&mutex_);
|
||||
file_types_excluded_from_write_fault_injection_ = types;
|
||||
}
|
||||
|
||||
bool ShouldExcludeFromWriteFaultInjection(const std::string& file_name) {
|
||||
MutexLock l(&mutex_);
|
||||
FileType file_type = kTempFile;
|
||||
uint64_t file_number = 0;
|
||||
if (!TryParseFileName(file_name, &file_number, &file_type)) {
|
||||
return false;
|
||||
}
|
||||
return file_types_excluded_from_write_fault_injection_.find(file_type) !=
|
||||
file_types_excluded_from_write_fault_injection_.end();
|
||||
}
|
||||
|
||||
void EnableThreadLocalErrorInjection(FaultInjectionIOType type) {
|
||||
ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
|
||||
if (ctx) {
|
||||
|
@ -551,6 +563,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
bool filesystem_active_; // Record flushes, syncs, writes
|
||||
bool filesystem_writable_; // Bypass FaultInjectionTestFS and go directly
|
||||
// to underlying FS for writable files
|
||||
bool inject_unsynced_data_loss_; // See InjectUnsyncedDataLoss()
|
||||
bool read_unsynced_data_; // See SetReadUnsyncedData()
|
||||
bool allow_link_open_file_; // See SetAllowLinkOpenFile()
|
||||
IOStatus fs_error_;
|
||||
|
@ -588,8 +601,8 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
}
|
||||
};
|
||||
|
||||
std::set<FileType> direct_writable_types_;
|
||||
std::set<Env::IOActivity> io_activties_exempted_from_fault_injection;
|
||||
std::set<FileType> file_types_excluded_from_write_fault_injection_;
|
||||
std::set<Env::IOActivity> io_activties_excluded_from_fault_injection;
|
||||
ThreadLocalPtr injected_thread_local_read_error_;
|
||||
ThreadLocalPtr injected_thread_local_write_error_;
|
||||
ThreadLocalPtr injected_thread_local_metadata_read_error_;
|
||||
|
|
Loading…
Reference in New Issue