mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 22:41:48 +00:00
408e8d4c85
Summary: **Context/Summary:** We discovered the following false positive in our crash test lately: (1) PUT() writes k/v to WAL but fails in `ApplyWALToManifest()`. The k/v is in the WAL (2) Current stress test logic will rollback the expected state of such k/v since PUT() fails (3) If the DB crashes before recovery finishes and reopens, the WAL will be replayed and the k/v is in the DB while the expected state have been roll-backed. We decided to leave those expected state to be pending until the loop-write of the same key succeeds. Bonus: Now that I realized write to manifest can also fail the write which faces the similar problem as https://github.com/facebook/rocksdb/pull/12797, I decided to disable fault injection on user write per thread (instead of globally) when tracing is needed for prefix recovery; some refactory Pull Request resolved: https://github.com/facebook/rocksdb/pull/12838 Test Plan: Rehearsal CI Run below command (varies on sync_fault_injection=1,0 to verify ExpectedState behavior) for a while to ensure crash recovery validation works fine ``` python3 tools/db_crashtest.py --simple blackbox --interval=30 --WAL_size_limit_MB=0 --WAL_ttl_seconds=0 --acquire_snapshot_one_in=10000 --adaptive_readahead=1 --adm_policy=1 --advise_random_on_open=0 --allow_concurrent_memtable_write=0 --allow_data_in_errors=True --allow_fallocate=0 --async_io=0 --auto_readahead_size=0 --avoid_flush_during_recovery=0 --avoid_flush_during_shutdown=0 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=0 --batch_protection_bytes_per_key=0 --bgerror_resume_retry_interval=1000000 --block_align=1 --block_protection_bytes_per_key=4 --block_size=16384 --bloom_before_level=4 --bloom_bits=56.810257702625165 --bottommost_compression_type=none --bottommost_file_compaction_delay=0 --bytes_per_sync=262144 --cache_index_and_filter_blocks=1 --cache_index_and_filter_blocks_with_high_priority=1 --cache_size=8388608 --cache_type=auto_hyper_clock_cache --charge_compression_dictionary_building_buffer=1 --charge_file_metadata=1 --charge_filter_construction=1 --charge_table_reader=0 --check_multiget_consistency=0 --check_multiget_entity_consistency=1 --checkpoint_one_in=10000 --checksum_type=kxxHash --clear_column_family_one_in=0 --column_families=1 --compact_files_one_in=1000 --compact_range_one_in=1000 --compaction_pri=4 --compaction_readahead_size=1048576 --compaction_ttl=10 --compress_format_version=1 --compressed_secondary_cache_ratio=0.0 --compressed_secondary_cache_size=0 --compression_checksum=0 --compression_max_dict_buffer_bytes=0 --compression_max_dict_bytes=0 --compression_parallel_threads=1 --compression_type=none --compression_use_zstd_dict_trainer=0 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --daily_offpeak_time_utc=04:00-08:00 --data_block_index_type=1 --db_write_buffer_size=0 --default_temperature=kWarm --default_write_temperature=kCold --delete_obsolete_files_period_micros=30000000 --delpercent=20 --delrangepercent=20 --destroy_db_initially=0 --detect_filter_construct_corruption=0 --disable_file_deletions_one_in=10000 --disable_manual_compaction_one_in=1000000 --disable_wal=0 --dump_malloc_stats=0 --enable_checksum_handoff=1 --enable_compaction_filter=0 --enable_custom_split_merge=0 --enable_do_not_compress_roles=0 --enable_index_compression=1 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=0 --enable_sst_partitioner_factory=0 --enable_thread_tracking=0 --enable_write_thread_adaptive_yield=0 --error_recovery_with_no_fault_injection=1 --exclude_wal_from_write_fault_injection=0 --fail_if_options_file_error=1 --fifo_allow_compaction=0 --file_checksum_impl=crc32c --fill_cache=1 --flush_one_in=1000000 --format_version=3 --get_all_column_family_metadata_one_in=1000000 --get_current_wal_file_one_in=0 --get_live_files_apis_one_in=1000000 --get_properties_of_all_tables_one_in=1000000 --get_property_one_in=100000 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=274877906944 --high_pri_pool_ratio=0.5 --index_block_restart_interval=4 --index_shortening=2 --index_type=0 --ingest_external_file_one_in=0 --initial_auto_readahead_size=16384 --inplace_update_support=0 --iterpercent=10 --key_len_percent_dist=1,30,69 --key_may_exist_one_in=100 --last_level_temperature=kWarm --level_compaction_dynamic_level_bytes=1 --lock_wal_one_in=10000 --log_file_time_to_roll=60 --log_readahead_size=16777216 --long_running_snapshots=1 --low_pri_pool_ratio=0 --lowest_used_cache_tier=0 --manifest_preallocation_size=0 --manual_wal_flush_one_in=0 --mark_for_compaction_one_file_in=10 --max_auto_readahead_size=16384 --max_background_compactions=1 --max_bytes_for_level_base=67108864 --max_key=100000 --max_key_len=3 --max_log_file_size=1048576 --max_manifest_file_size=32768 --max_sequential_skip_in_iterations=1 --max_total_wal_size=0 --max_write_batch_group_size_bytes=16 --max_write_buffer_number=10 --max_write_buffer_size_to_maintain=8388608 --memtable_insert_hint_per_batch=1 --memtable_max_range_deletions=0 --memtable_prefix_bloom_size_ratio=0.01 --memtable_protection_bytes_per_key=1 --memtable_whole_key_filtering=1 --memtablerep=skip_list --metadata_charge_policy=1 --metadata_read_fault_one_in=0 --metadata_write_fault_one_in=8 --min_write_buffer_number_to_merge=1 --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=1 --open_files=-1 --open_metadata_read_fault_one_in=0 --open_metadata_write_fault_one_in=8 --open_read_fault_one_in=0 --open_write_fault_one_in=8 --ops_per_thread=100000000 --optimize_filters_for_hits=1 --optimize_filters_for_memory=1 --optimize_multiget_for_io=1 --paranoid_file_checks=0 --partition_filters=0 --partition_pinning=3 --pause_background_one_in=1000000 --periodic_compaction_seconds=2 --prefix_size=7 --prefixpercent=0 --prepopulate_block_cache=0 --preserve_internal_time_seconds=0 --progress_reports=0 --promote_l0_one_in=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=1000 --readahead_size=524288 --readpercent=10 --recycle_log_file_num=1 --reopen=0 --report_bg_io_stats=0 --reset_stats_one_in=1000000 --sample_for_compression=0 --secondary_cache_fault_one_in=0 --set_options_one_in=0 --skip_stats_update_on_db_open=1 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=68719476736 --sqfc_name=foo --sqfc_version=0 --sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=0 --stats_dump_period_sec=10 --stats_history_buffer_size=0 --strict_bytes_per_sync=1 --subcompactions=4 --sync=1 --sync_fault_injection=0 --table_cache_numshardbits=6 --target_file_size_base=16777216 --target_file_size_multiplier=1 --test_batches_snapshots=0 --top_level_index_pinning=2 --uncache_aggressiveness=239 --universal_max_read_amp=-1 --unpartitioned_pinning=1 --use_adaptive_mutex=1 --use_adaptive_mutex_lru=1 --use_attribute_group=0 --use_delta_encoding=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=0 --use_multi_cf_iterator=0 --use_multi_get_entity=0 --use_multiget=0 --use_put_entity_one_in=0 --use_sqfc_for_range_queries=1 --use_timed_put_one_in=0 --use_write_buffer_manager=0 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_compression=0 --verify_db_one_in=100000 --verify_file_checksums_one_in=1000000 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=none --write_buffer_size=33554432 --write_dbid_to_manifest=0 --write_fault_one_in=8 --writepercent=40 ``` Reviewed By: cbi42 Differential Revision: D59377075 Pulled By: hx235 fbshipit-source-id: 91f602fd67e2d339d378cd28b982095fd073dcb6
761 lines
28 KiB
C++
761 lines
28 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright 2014 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
// This test uses a custom FileSystem to keep track of the state of a file
|
|
// system the last "Sync". The data being written is cached in a "buffer".
|
|
// Only when "Sync" is called, the data will be persistent. It can similate
|
|
// file data loss (or entire files) not protected by a "Sync". For any of the
|
|
// FileSystem related operations, by specify the "IOStatus Error", a specific
|
|
// error can be returned when file system is not activated.
|
|
|
|
#pragma once
|
|
|
|
#include <algorithm>
|
|
#include <map>
|
|
#include <set>
|
|
#include <string>
|
|
|
|
#include "file/filename.h"
|
|
#include "rocksdb/file_system.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/random.h"
|
|
#include "util/thread_local.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class TestFSWritableFile;
|
|
class FaultInjectionTestFS;
|
|
|
|
enum class FaultInjectionIOType {
|
|
kRead = 0,
|
|
kWrite,
|
|
kMetadataRead,
|
|
kMetadataWrite,
|
|
};
|
|
|
|
struct FSFileState {
|
|
std::string filename_;
|
|
uint64_t pos_at_last_append_ = 0;
|
|
uint64_t pos_at_last_sync_ = 0;
|
|
std::string buffer_;
|
|
|
|
explicit FSFileState(const std::string& filename = {})
|
|
: filename_(filename) {}
|
|
|
|
bool IsFullySynced() const {
|
|
return pos_at_last_append_ == pos_at_last_sync_;
|
|
}
|
|
|
|
IOStatus DropUnsyncedData();
|
|
|
|
IOStatus DropRandomUnsyncedData(Random* rand);
|
|
};
|
|
|
|
// A wrapper around WritableFileWriter* file
|
|
// is written to or sync'ed.
|
|
class TestFSWritableFile : public FSWritableFile {
|
|
public:
|
|
explicit TestFSWritableFile(const std::string& fname,
|
|
const FileOptions& file_opts,
|
|
std::unique_ptr<FSWritableFile>&& f,
|
|
FaultInjectionTestFS* fs);
|
|
virtual ~TestFSWritableFile();
|
|
IOStatus Append(const Slice& data, const IOOptions&,
|
|
IODebugContext*) override;
|
|
IOStatus Append(const Slice& data, const IOOptions& options,
|
|
const DataVerificationInfo& verification_info,
|
|
IODebugContext* dbg) override;
|
|
IOStatus Truncate(uint64_t size, const IOOptions& options,
|
|
IODebugContext* dbg) override;
|
|
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override;
|
|
IOStatus Flush(const IOOptions&, IODebugContext*) override;
|
|
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override;
|
|
IOStatus RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/,
|
|
const IOOptions& options, IODebugContext* dbg) override;
|
|
bool IsSyncThreadSafe() const override { return true; }
|
|
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
|
const IOOptions& options,
|
|
IODebugContext* dbg) override;
|
|
IOStatus PositionedAppend(const Slice& data, uint64_t offset,
|
|
const IOOptions& options,
|
|
const DataVerificationInfo& verification_info,
|
|
IODebugContext* dbg) override;
|
|
size_t GetRequiredBufferAlignment() const override {
|
|
return target_->GetRequiredBufferAlignment();
|
|
}
|
|
bool use_direct_io() const override { return target_->use_direct_io(); }
|
|
|
|
uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override {
|
|
MutexLock l(&mutex_);
|
|
return target_->GetFileSize(options, dbg);
|
|
}
|
|
|
|
private:
|
|
FSFileState state_; // Need protection by mutex_
|
|
FileOptions file_opts_;
|
|
std::unique_ptr<FSWritableFile> target_;
|
|
bool writable_file_opened_;
|
|
FaultInjectionTestFS* fs_;
|
|
port::Mutex mutex_;
|
|
const bool unsync_data_loss_;
|
|
};
|
|
|
|
// A wrapper around WritableFileWriter* file
|
|
// is written to or sync'ed.
|
|
class TestFSRandomRWFile : public FSRandomRWFile {
|
|
public:
|
|
explicit TestFSRandomRWFile(const std::string& fname,
|
|
std::unique_ptr<FSRandomRWFile>&& f,
|
|
FaultInjectionTestFS* fs);
|
|
virtual ~TestFSRandomRWFile();
|
|
IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& options,
|
|
IODebugContext* dbg) override;
|
|
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
|
|
Slice* result, char* scratch,
|
|
IODebugContext* dbg) const override;
|
|
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override;
|
|
IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override;
|
|
IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override;
|
|
size_t GetRequiredBufferAlignment() const override {
|
|
return target_->GetRequiredBufferAlignment();
|
|
}
|
|
bool use_direct_io() const override { return target_->use_direct_io(); }
|
|
|
|
private:
|
|
std::unique_ptr<FSRandomRWFile> target_;
|
|
bool file_opened_;
|
|
FaultInjectionTestFS* fs_;
|
|
};
|
|
|
|
class TestFSRandomAccessFile : public FSRandomAccessFile {
|
|
public:
|
|
explicit TestFSRandomAccessFile(const std::string& fname,
|
|
std::unique_ptr<FSRandomAccessFile>&& f,
|
|
FaultInjectionTestFS* fs);
|
|
~TestFSRandomAccessFile() override {}
|
|
IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
|
|
Slice* result, char* scratch,
|
|
IODebugContext* dbg) const override;
|
|
IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
|
|
std::function<void(FSReadRequest&, void*)> cb,
|
|
void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
|
|
IODebugContext* dbg) override;
|
|
IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
|
|
const IOOptions& options, IODebugContext* dbg) override;
|
|
size_t GetRequiredBufferAlignment() const override {
|
|
return target_->GetRequiredBufferAlignment();
|
|
}
|
|
bool use_direct_io() const override { return target_->use_direct_io(); }
|
|
|
|
size_t GetUniqueId(char* id, size_t max_size) const override;
|
|
|
|
private:
|
|
std::unique_ptr<FSRandomAccessFile> target_;
|
|
FaultInjectionTestFS* fs_;
|
|
};
|
|
|
|
class TestFSSequentialFile : public FSSequentialFileOwnerWrapper {
|
|
public:
|
|
explicit TestFSSequentialFile(std::unique_ptr<FSSequentialFile>&& f,
|
|
FaultInjectionTestFS* fs, std::string fname)
|
|
: FSSequentialFileOwnerWrapper(std::move(f)),
|
|
fs_(fs),
|
|
fname_(std::move(fname)) {}
|
|
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
|
|
char* scratch, IODebugContext* dbg) override;
|
|
IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& options,
|
|
Slice* result, char* scratch,
|
|
IODebugContext* dbg) override;
|
|
|
|
private:
|
|
FaultInjectionTestFS* fs_;
|
|
std::string fname_;
|
|
uint64_t read_pos_ = 0;
|
|
uint64_t target_read_pos_ = 0;
|
|
};
|
|
|
|
class TestFSDirectory : public FSDirectory {
|
|
public:
|
|
explicit TestFSDirectory(FaultInjectionTestFS* fs, std::string dirname,
|
|
FSDirectory* dir)
|
|
: fs_(fs), dirname_(std::move(dirname)), dir_(dir) {}
|
|
~TestFSDirectory() {}
|
|
|
|
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override;
|
|
|
|
IOStatus Close(const IOOptions& options, IODebugContext* dbg) override;
|
|
|
|
IOStatus FsyncWithDirOptions(
|
|
const IOOptions& options, IODebugContext* dbg,
|
|
const DirFsyncOptions& dir_fsync_options) override;
|
|
|
|
private:
|
|
FaultInjectionTestFS* fs_;
|
|
std::string dirname_;
|
|
std::unique_ptr<FSDirectory> dir_;
|
|
};
|
|
|
|
class FaultInjectionTestFS : public FileSystemWrapper {
|
|
public:
|
|
explicit FaultInjectionTestFS(const std::shared_ptr<FileSystem>& base)
|
|
: FileSystemWrapper(base),
|
|
filesystem_active_(true),
|
|
filesystem_writable_(false),
|
|
inject_unsynced_data_loss_(false),
|
|
read_unsynced_data_(true),
|
|
allow_link_open_file_(false),
|
|
injected_thread_local_read_error_(DeleteThreadLocalErrorContext),
|
|
injected_thread_local_write_error_(DeleteThreadLocalErrorContext),
|
|
injected_thread_local_metadata_read_error_(
|
|
DeleteThreadLocalErrorContext),
|
|
injected_thread_local_metadata_write_error_(
|
|
DeleteThreadLocalErrorContext),
|
|
ingest_data_corruption_before_write_(false),
|
|
checksum_handoff_func_type_(kCRC32c),
|
|
fail_get_file_unique_id_(false) {}
|
|
virtual ~FaultInjectionTestFS() override { fs_error_.PermitUncheckedError(); }
|
|
|
|
static const char* kClassName() { return "FaultInjectionTestFS"; }
|
|
const char* Name() const override { return kClassName(); }
|
|
|
|
static bool IsInjectedError(const Status& s) {
|
|
assert(!s.ok());
|
|
return std::strstr(s.getState(), kInjected.c_str());
|
|
}
|
|
|
|
static bool IsFailedToWriteToWALError(const Status& s) {
|
|
assert(!s.ok());
|
|
return std::strstr(s.getState(), kFailedToWriteToWAL.c_str());
|
|
}
|
|
|
|
IOStatus NewDirectory(const std::string& name, const IOOptions& options,
|
|
std::unique_ptr<FSDirectory>* result,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus FileExists(const std::string& fname, const IOOptions& options,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus GetChildren(const std::string& dir, const IOOptions& options,
|
|
std::vector<std::string>* result,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus GetChildrenFileAttributes(const std::string& dir,
|
|
const IOOptions& options,
|
|
std::vector<FileAttributes>* result,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus NewWritableFile(const std::string& fname,
|
|
const FileOptions& file_opts,
|
|
std::unique_ptr<FSWritableFile>* result,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus ReopenWritableFile(const std::string& fname,
|
|
const FileOptions& file_opts,
|
|
std::unique_ptr<FSWritableFile>* result,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus ReuseWritableFile(const std::string& fname,
|
|
const std::string& old_fname,
|
|
const FileOptions& file_opts,
|
|
std::unique_ptr<FSWritableFile>* result,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus NewRandomRWFile(const std::string& fname,
|
|
const FileOptions& file_opts,
|
|
std::unique_ptr<FSRandomRWFile>* result,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus NewRandomAccessFile(const std::string& fname,
|
|
const FileOptions& file_opts,
|
|
std::unique_ptr<FSRandomAccessFile>* result,
|
|
IODebugContext* dbg) override;
|
|
IOStatus NewSequentialFile(const std::string& f, const FileOptions& file_opts,
|
|
std::unique_ptr<FSSequentialFile>* r,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus DeleteFile(const std::string& f, const IOOptions& options,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus GetFileSize(const std::string& f, const IOOptions& options,
|
|
uint64_t* file_size, IODebugContext* dbg) override;
|
|
|
|
IOStatus GetFileModificationTime(const std::string& fname,
|
|
const IOOptions& options,
|
|
uint64_t* file_mtime,
|
|
IODebugContext* dbg) override;
|
|
|
|
IOStatus RenameFile(const std::string& s, const std::string& t,
|
|
const IOOptions& options, IODebugContext* dbg) override;
|
|
|
|
IOStatus LinkFile(const std::string& src, const std::string& target,
|
|
const IOOptions& options, IODebugContext* dbg) override;
|
|
|
|
IOStatus NumFileLinks(const std::string& fname, const IOOptions& options,
|
|
uint64_t* count, IODebugContext* dbg) override;
|
|
|
|
IOStatus AreFilesSame(const std::string& first, const std::string& second,
|
|
const IOOptions& options, bool* res,
|
|
IODebugContext* dbg) override;
|
|
IOStatus GetAbsolutePath(const std::string& db_path, const IOOptions& options,
|
|
std::string* output_path,
|
|
IODebugContext* dbg) override;
|
|
|
|
// Undef to eliminate clash on Windows
|
|
#undef GetFreeSpace
|
|
IOStatus GetFreeSpace(const std::string& path, const IOOptions& options,
|
|
uint64_t* disk_free, IODebugContext* dbg) override {
|
|
IOStatus io_s;
|
|
if (!IsFilesystemActive() &&
|
|
fs_error_.subcode() == IOStatus::SubCode::kNoSpace) {
|
|
*disk_free = 0;
|
|
} else {
|
|
io_s = MaybeInjectThreadLocalError(FaultInjectionIOType::kMetadataRead,
|
|
options);
|
|
if (io_s.ok()) {
|
|
io_s = target()->GetFreeSpace(path, options, disk_free, dbg);
|
|
}
|
|
}
|
|
return io_s;
|
|
}
|
|
|
|
IOStatus IsDirectory(const std::string& path, const IOOptions& options,
|
|
bool* is_dir, IODebugContext* dgb) override;
|
|
|
|
IOStatus Poll(std::vector<void*>& io_handles,
|
|
size_t min_completions) override;
|
|
|
|
IOStatus AbortIO(std::vector<void*>& io_handles) override;
|
|
|
|
void WritableFileClosed(const FSFileState& state);
|
|
|
|
void WritableFileSynced(const FSFileState& state);
|
|
|
|
void WritableFileAppended(const FSFileState& state);
|
|
|
|
IOStatus DropUnsyncedFileData();
|
|
|
|
IOStatus DropRandomUnsyncedFileData(Random* rnd);
|
|
|
|
IOStatus DeleteFilesCreatedAfterLastDirSync(const IOOptions& options,
|
|
IODebugContext* dbg);
|
|
|
|
void ResetState();
|
|
|
|
void UntrackFile(const std::string& f);
|
|
|
|
void SyncDir(const std::string& dirname) {
|
|
MutexLock l(&mutex_);
|
|
dir_to_new_files_since_last_sync_.erase(dirname);
|
|
}
|
|
|
|
// Setting the filesystem to inactive is the test equivalent to simulating a
|
|
// system reset. Setting to inactive will freeze our saved filesystem state so
|
|
// that it will stop being recorded. It can then be reset back to the state at
|
|
// the time of the reset.
|
|
bool IsFilesystemActive() {
|
|
MutexLock l(&mutex_);
|
|
return filesystem_active_;
|
|
}
|
|
|
|
// Setting filesystem_writable_ makes NewWritableFile. ReopenWritableFile,
|
|
// and NewRandomRWFile bypass FaultInjectionTestFS and go directly to the
|
|
// target FS
|
|
bool IsFilesystemDirectWritable() {
|
|
MutexLock l(&mutex_);
|
|
return filesystem_writable_;
|
|
}
|
|
void SetFilesystemActiveNoLock(
|
|
bool active, IOStatus error = IOStatus::Corruption("Not active")) {
|
|
error.PermitUncheckedError();
|
|
filesystem_active_ = active;
|
|
if (!active) {
|
|
fs_error_ = error;
|
|
}
|
|
}
|
|
void SetFilesystemActive(
|
|
bool active, IOStatus error = IOStatus::Corruption("Not active")) {
|
|
MutexLock l(&mutex_);
|
|
error.PermitUncheckedError();
|
|
SetFilesystemActiveNoLock(active, error);
|
|
}
|
|
void SetFilesystemDirectWritable(bool writable) {
|
|
MutexLock l(&mutex_);
|
|
filesystem_writable_ = writable;
|
|
}
|
|
|
|
// If true, we buffer write data in memory to simulate data loss upon system
|
|
// crash by only having process crashes
|
|
void SetInjectUnsyncedDataLoss(bool inject) {
|
|
MutexLock l(&mutex_);
|
|
inject_unsynced_data_loss_ = inject;
|
|
}
|
|
|
|
bool InjectUnsyncedDataLoss() {
|
|
MutexLock l(&mutex_);
|
|
return inject_unsynced_data_loss_;
|
|
}
|
|
|
|
// In places (e.g. GetSortedWals()) RocksDB relies on querying the file size
|
|
// or even reading the contents of files currently open for writing, and
|
|
// as in POSIX semantics, expects to see the flushed size and contents
|
|
// regardless of what has been synced. FaultInjectionTestFS historically
|
|
// did not emulate this behavior, only showing synced data from such read
|
|
// operations. (Different from FaultInjectionTestEnv--sigh.) Calling this
|
|
// function with false restores this historical behavior for testing
|
|
// stability, but use of this semantics must be phased out as it is
|
|
// inconsistent with expected FileSystem semantics. In other words, this
|
|
// functionality is DEPRECATED. Intended to be set after construction and
|
|
// unchanged (not thread safe).
|
|
void SetReadUnsyncedData(bool read_unsynced_data) {
|
|
read_unsynced_data_ = read_unsynced_data;
|
|
}
|
|
bool ReadUnsyncedData() const { return read_unsynced_data_; }
|
|
|
|
// FaultInjectionTestFS normally includes a hygiene check for FileSystem
|
|
// implementations that only support LinkFile() on closed files (not open
|
|
// for write). Setting this to true bypasses the check.
|
|
void SetAllowLinkOpenFile(bool allow_link_open_file = true) {
|
|
allow_link_open_file_ = allow_link_open_file;
|
|
}
|
|
|
|
bool ShouldIOActivtiesExcludedFromFaultInjection(Env::IOActivity io_activty) {
|
|
MutexLock l(&mutex_);
|
|
return io_activties_excluded_from_fault_injection.find(io_activty) !=
|
|
io_activties_excluded_from_fault_injection.end();
|
|
}
|
|
|
|
void AssertNoOpenFile() { assert(open_managed_files_.empty()); }
|
|
|
|
IOStatus GetError() { return fs_error_; }
|
|
|
|
void SetFileSystemIOError(IOStatus io_error) {
|
|
MutexLock l(&mutex_);
|
|
io_error.PermitUncheckedError();
|
|
fs_error_ = io_error;
|
|
}
|
|
|
|
// To simulate the data corruption before data is written in FS
|
|
void IngestDataCorruptionBeforeWrite() {
|
|
MutexLock l(&mutex_);
|
|
ingest_data_corruption_before_write_ = true;
|
|
}
|
|
|
|
void NoDataCorruptionBeforeWrite() {
|
|
MutexLock l(&mutex_);
|
|
ingest_data_corruption_before_write_ = false;
|
|
}
|
|
|
|
bool ShouldDataCorruptionBeforeWrite() {
|
|
MutexLock l(&mutex_);
|
|
return ingest_data_corruption_before_write_;
|
|
}
|
|
|
|
void SetChecksumHandoffFuncType(const ChecksumType& func_type) {
|
|
MutexLock l(&mutex_);
|
|
checksum_handoff_func_type_ = func_type;
|
|
}
|
|
|
|
const ChecksumType& GetChecksumHandoffFuncType() {
|
|
MutexLock l(&mutex_);
|
|
return checksum_handoff_func_type_;
|
|
}
|
|
|
|
void SetFailGetUniqueId(bool flag) {
|
|
MutexLock l(&mutex_);
|
|
fail_get_file_unique_id_ = flag;
|
|
}
|
|
|
|
bool ShouldFailGetUniqueId() {
|
|
MutexLock l(&mutex_);
|
|
return fail_get_file_unique_id_;
|
|
}
|
|
|
|
// Specify what the operation, so we can inject the right type of error
|
|
enum ErrorOperation : char {
|
|
kRead = 0,
|
|
kMultiReadSingleReq = 1,
|
|
kMultiRead = 2,
|
|
kOpen,
|
|
kAppend,
|
|
kPositionedAppend,
|
|
kUnknown,
|
|
};
|
|
|
|
void SetThreadLocalErrorContext(FaultInjectionIOType type, uint32_t seed,
|
|
int one_in, bool retryable,
|
|
bool has_data_loss) {
|
|
struct ErrorContext* new_ctx = new ErrorContext(seed);
|
|
new_ctx->one_in = one_in;
|
|
new_ctx->count = 0;
|
|
new_ctx->retryable = retryable;
|
|
new_ctx->has_data_loss = has_data_loss;
|
|
|
|
SetErrorContextOfFaultInjectionIOType(type, new_ctx);
|
|
}
|
|
|
|
static void DeleteThreadLocalErrorContext(void* p) {
|
|
ErrorContext* ctx = static_cast<ErrorContext*>(p);
|
|
delete ctx;
|
|
}
|
|
|
|
IOStatus MaybeInjectThreadLocalError(
|
|
FaultInjectionIOType type, const IOOptions& io_options,
|
|
const std::string& file_name = "", ErrorOperation op = kUnknown,
|
|
Slice* slice = nullptr, bool direct_io = false, char* scratch = nullptr,
|
|
bool need_count_increase = false, bool* fault_injected = nullptr);
|
|
|
|
int GetAndResetInjectedThreadLocalErrorCount(FaultInjectionIOType type) {
|
|
ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
|
|
int count = 0;
|
|
if (ctx) {
|
|
count = ctx->count;
|
|
ctx->count = 0;
|
|
}
|
|
return count;
|
|
}
|
|
|
|
void SetIOActivtiesExcludedFromFaultInjection(
|
|
const std::set<Env::IOActivity>& io_activties) {
|
|
MutexLock l(&mutex_);
|
|
io_activties_excluded_from_fault_injection = io_activties;
|
|
}
|
|
|
|
void SetFileTypesExcludedFromWriteFaultInjection(
|
|
const std::set<FileType>& types) {
|
|
MutexLock l(&mutex_);
|
|
file_types_excluded_from_write_fault_injection_ = types;
|
|
}
|
|
|
|
void EnableThreadLocalErrorInjection(FaultInjectionIOType type) {
|
|
ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
|
|
if (ctx) {
|
|
ctx->enable_error_injection = true;
|
|
}
|
|
}
|
|
|
|
void EnableAllThreadLocalErrorInjection() {
|
|
EnableThreadLocalErrorInjection(FaultInjectionIOType::kRead);
|
|
EnableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
|
|
EnableThreadLocalErrorInjection(FaultInjectionIOType::kMetadataRead);
|
|
EnableThreadLocalErrorInjection(FaultInjectionIOType::kMetadataWrite);
|
|
}
|
|
|
|
void DisableThreadLocalErrorInjection(FaultInjectionIOType type) {
|
|
ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
|
|
if (ctx) {
|
|
ctx->enable_error_injection = false;
|
|
}
|
|
}
|
|
|
|
void DisableAllThreadLocalErrorInjection() {
|
|
DisableThreadLocalErrorInjection(FaultInjectionIOType::kRead);
|
|
DisableThreadLocalErrorInjection(FaultInjectionIOType::kWrite);
|
|
DisableThreadLocalErrorInjection(FaultInjectionIOType::kMetadataRead);
|
|
DisableThreadLocalErrorInjection(FaultInjectionIOType::kMetadataWrite);
|
|
}
|
|
|
|
void PrintInjectedThreadLocalErrorBacktrace(FaultInjectionIOType type);
|
|
|
|
// If there is unsynced data in the specified file within the specified
|
|
// range [offset, offset + n), return the unsynced data overlapping with
|
|
// that range, in a corresponding range of scratch. When known, also return
|
|
// the position of the last sync, so that the caller can determine whether
|
|
// more data is available from the target file when not available from
|
|
// unsynced.
|
|
void ReadUnsynced(const std::string& fname, uint64_t offset, size_t n,
|
|
Slice* result, char* scratch, int64_t* pos_at_last_sync);
|
|
|
|
inline static const std::string kInjected = "injected";
|
|
|
|
private:
|
|
inline static const std::string kFailedToWriteToWAL =
|
|
"failed to write to WAL";
|
|
port::Mutex mutex_;
|
|
std::map<std::string, FSFileState> db_file_state_;
|
|
std::set<std::string> open_managed_files_;
|
|
// directory -> (file name -> file contents to recover)
|
|
// When data is recovered from unsyned parent directory, the files with
|
|
// empty file contents to recover is deleted. Those with non-empty ones
|
|
// will be recovered to content accordingly.
|
|
std::unordered_map<std::string, std::map<std::string, std::string>>
|
|
dir_to_new_files_since_last_sync_;
|
|
bool filesystem_active_; // Record flushes, syncs, writes
|
|
bool filesystem_writable_; // Bypass FaultInjectionTestFS and go directly
|
|
// to underlying FS for writable files
|
|
bool inject_unsynced_data_loss_; // See InjectUnsyncedDataLoss()
|
|
bool read_unsynced_data_; // See SetReadUnsyncedData()
|
|
bool allow_link_open_file_; // See SetAllowLinkOpenFile()
|
|
IOStatus fs_error_;
|
|
|
|
enum ErrorType : int {
|
|
kErrorTypeStatus = 0,
|
|
kErrorTypeCorruption,
|
|
kErrorTypeTruncated,
|
|
kErrorTypeMax
|
|
};
|
|
|
|
struct ErrorContext {
|
|
Random rand;
|
|
int one_in;
|
|
int count;
|
|
bool enable_error_injection;
|
|
void* callstack;
|
|
std::string message;
|
|
int frames;
|
|
ErrorType type;
|
|
bool retryable;
|
|
bool has_data_loss;
|
|
|
|
explicit ErrorContext(uint32_t seed)
|
|
: rand(seed),
|
|
enable_error_injection(false),
|
|
callstack(nullptr),
|
|
frames(0),
|
|
retryable(false),
|
|
has_data_loss(false) {}
|
|
~ErrorContext() {
|
|
if (callstack) {
|
|
free(callstack);
|
|
}
|
|
}
|
|
};
|
|
|
|
std::set<FileType> file_types_excluded_from_write_fault_injection_;
|
|
std::set<Env::IOActivity> io_activties_excluded_from_fault_injection;
|
|
ThreadLocalPtr injected_thread_local_read_error_;
|
|
ThreadLocalPtr injected_thread_local_write_error_;
|
|
ThreadLocalPtr injected_thread_local_metadata_read_error_;
|
|
ThreadLocalPtr injected_thread_local_metadata_write_error_;
|
|
bool ingest_data_corruption_before_write_;
|
|
ChecksumType checksum_handoff_func_type_;
|
|
bool fail_get_file_unique_id_;
|
|
|
|
// Inject an error. For a READ operation, a status of IOError(), a
|
|
// corruption in the contents of scratch, or truncation of slice
|
|
// are the types of error with equal probability. For OPEN,
|
|
// its always an IOError.
|
|
// fault_injected returns whether a fault is injected. It is needed
|
|
// because some fault is inected with IOStatus to be OK.
|
|
IOStatus MaybeInjectThreadLocalReadError(const IOOptions& io_options,
|
|
ErrorOperation op, Slice* slice,
|
|
bool direct_io, char* scratch,
|
|
bool need_count_increase,
|
|
bool* fault_injected);
|
|
|
|
bool ShouldExcludeFromWriteFaultInjection(const std::string& file_name) {
|
|
MutexLock l(&mutex_);
|
|
FileType file_type = kTempFile;
|
|
uint64_t file_number = 0;
|
|
if (!TryParseFileName(file_name, &file_number, &file_type)) {
|
|
return false;
|
|
}
|
|
return file_types_excluded_from_write_fault_injection_.find(file_type) !=
|
|
file_types_excluded_from_write_fault_injection_.end();
|
|
}
|
|
|
|
// Extract number of type from file name. Return false if failing to fine
|
|
// them.
|
|
bool TryParseFileName(const std::string& file_name, uint64_t* number,
|
|
FileType* type);
|
|
|
|
ErrorContext* GetErrorContextFromFaultInjectionIOType(
|
|
FaultInjectionIOType type) {
|
|
ErrorContext* ctx = nullptr;
|
|
switch (type) {
|
|
case FaultInjectionIOType::kRead:
|
|
ctx = static_cast<struct ErrorContext*>(
|
|
injected_thread_local_read_error_.Get());
|
|
break;
|
|
case FaultInjectionIOType::kWrite:
|
|
ctx = static_cast<struct ErrorContext*>(
|
|
injected_thread_local_write_error_.Get());
|
|
break;
|
|
case FaultInjectionIOType::kMetadataRead:
|
|
ctx = static_cast<struct ErrorContext*>(
|
|
injected_thread_local_metadata_read_error_.Get());
|
|
break;
|
|
case FaultInjectionIOType::kMetadataWrite:
|
|
ctx = static_cast<struct ErrorContext*>(
|
|
injected_thread_local_metadata_write_error_.Get());
|
|
break;
|
|
default:
|
|
assert(false);
|
|
break;
|
|
}
|
|
return ctx;
|
|
}
|
|
|
|
void SetErrorContextOfFaultInjectionIOType(FaultInjectionIOType type,
|
|
ErrorContext* new_ctx) {
|
|
ErrorContext* old_ctx = nullptr;
|
|
switch (type) {
|
|
case FaultInjectionIOType::kRead:
|
|
old_ctx = static_cast<struct ErrorContext*>(
|
|
injected_thread_local_read_error_.Swap(new_ctx));
|
|
break;
|
|
case FaultInjectionIOType::kWrite:
|
|
old_ctx = static_cast<struct ErrorContext*>(
|
|
injected_thread_local_write_error_.Swap(new_ctx));
|
|
break;
|
|
case FaultInjectionIOType::kMetadataRead:
|
|
old_ctx = static_cast<struct ErrorContext*>(
|
|
injected_thread_local_metadata_read_error_.Swap(new_ctx));
|
|
break;
|
|
case FaultInjectionIOType::kMetadataWrite:
|
|
old_ctx = static_cast<struct ErrorContext*>(
|
|
injected_thread_local_metadata_write_error_.Swap(new_ctx));
|
|
break;
|
|
default:
|
|
assert(false);
|
|
break;
|
|
}
|
|
|
|
if (old_ctx) {
|
|
DeleteThreadLocalErrorContext(old_ctx);
|
|
}
|
|
}
|
|
|
|
std::string GetErrorMessage(FaultInjectionIOType type,
|
|
const std::string& file_name, ErrorOperation op) {
|
|
std::ostringstream msg;
|
|
msg << kInjected << " ";
|
|
switch (type) {
|
|
case FaultInjectionIOType::kRead:
|
|
msg << "read error";
|
|
break;
|
|
case FaultInjectionIOType::kWrite:
|
|
msg << "write error";
|
|
break;
|
|
case FaultInjectionIOType::kMetadataRead:
|
|
msg << "metadata read error";
|
|
break;
|
|
case FaultInjectionIOType::kMetadataWrite:
|
|
msg << "metadata write error";
|
|
break;
|
|
default:
|
|
assert(false);
|
|
break;
|
|
}
|
|
|
|
if (type == FaultInjectionIOType::kWrite &&
|
|
(op == ErrorOperation::kOpen || op == ErrorOperation::kAppend ||
|
|
op == ErrorOperation::kPositionedAppend)) {
|
|
FileType file_type = kTempFile;
|
|
uint64_t ignore = 0;
|
|
if (TryParseFileName(file_name, &ignore, &file_type) &&
|
|
file_type == FileType::kWalFile) {
|
|
msg << " " << kFailedToWriteToWAL;
|
|
}
|
|
}
|
|
return msg.str();
|
|
}
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|