mirror of https://github.com/facebook/rocksdb.git
Inject retryable write IOError when writing to SST files in stress test (#11829)
Summary: * db_crashtest.py now may set `write_fault_one_in` to 500 for blackbox and whitebox simple test. * Error injection only applies to writing to SST files. Flush error will cause DB to pause background operations and auto-resume. Compaction error will just re-schedule later. * File ingestion and back up tests are updated to check if the result status is due to an injected error. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11829 Test Plan: a full round of whitebox simple and blackbox simple crash test * `python3 ./tools/db_crashtest.py whitebox/blackbox --simple --write_fault_one_in=500` Reviewed By: ajkr Differential Revision: D49256962 Pulled By: cbi42 fbshipit-source-id: 68e0c9648d8e03bad39c7672b25d5500fc286d97
This commit is contained in:
parent
cc254efea6
commit
c90807d103
|
@ -81,12 +81,30 @@ bool RunStressTestImpl(SharedState* shared) {
|
|||
stress->InitDb(shared);
|
||||
stress->FinishInitDb(shared);
|
||||
|
||||
if (FLAGS_write_fault_one_in) {
|
||||
if (!FLAGS_sync_fault_injection) {
|
||||
// unsynced WAL loss is not supported without sync_fault_injection
|
||||
fault_fs_guard->SetDirectWritableTypes({kWalFile});
|
||||
}
|
||||
IOStatus error_msg;
|
||||
if (FLAGS_inject_error_severity <= 1 || FLAGS_inject_error_severity > 2) {
|
||||
error_msg = IOStatus::IOError("Retryable injected write error");
|
||||
error_msg.SetRetryable(true);
|
||||
} else if (FLAGS_inject_error_severity == 2) {
|
||||
error_msg = IOStatus::IOError("Fatal injected write error");
|
||||
error_msg.SetDataLoss(true);
|
||||
}
|
||||
// TODO: inject write error for other file types including
|
||||
// MANIFEST, CURRENT, and WAL files.
|
||||
fault_fs_guard->SetRandomWriteError(
|
||||
shared->GetSeed(), FLAGS_write_fault_one_in, error_msg,
|
||||
/*inject_for_all_file_types=*/false, {FileType::kTableFile});
|
||||
fault_fs_guard->SetFilesystemDirectWritable(false);
|
||||
fault_fs_guard->EnableWriteErrorInjection();
|
||||
}
|
||||
if (FLAGS_sync_fault_injection) {
|
||||
fault_fs_guard->SetFilesystemDirectWritable(false);
|
||||
}
|
||||
if (FLAGS_write_fault_one_in) {
|
||||
fault_fs_guard->EnableWriteErrorInjection();
|
||||
}
|
||||
|
||||
uint32_t n = FLAGS_threads;
|
||||
uint64_t now = clock->NowMicros();
|
||||
|
|
|
@ -1007,7 +1007,8 @@ DEFINE_string(file_checksum_impl, "none",
|
|||
"\"none\" for null.");
|
||||
|
||||
DEFINE_int32(write_fault_one_in, 0,
|
||||
"On non-zero, enables fault injection on write");
|
||||
"On non-zero, enables fault injection on write. Currently only"
|
||||
"injects write error when writing to SST files.");
|
||||
|
||||
DEFINE_uint64(user_timestamp_size, 0,
|
||||
"Number of bytes for a user-defined timestamp. Currently, only "
|
||||
|
|
|
@ -785,23 +785,6 @@ void StressTest::OperateDb(ThreadState* thread) {
|
|||
FLAGS_inject_error_severity == 1 /* retryable */);
|
||||
}
|
||||
#endif // NDEBUG
|
||||
if (FLAGS_write_fault_one_in) {
|
||||
IOStatus error_msg;
|
||||
if (FLAGS_inject_error_severity <= 1 || FLAGS_inject_error_severity > 2) {
|
||||
error_msg = IOStatus::IOError("Retryable IO Error");
|
||||
error_msg.SetRetryable(true);
|
||||
} else if (FLAGS_inject_error_severity == 2) {
|
||||
// Inject a fatal error
|
||||
error_msg = IOStatus::IOError("Fatal IO Error");
|
||||
error_msg.SetDataLoss(true);
|
||||
}
|
||||
std::vector<FileType> types = {FileType::kTableFile,
|
||||
FileType::kDescriptorFile,
|
||||
FileType::kCurrentFile};
|
||||
fault_fs_guard->SetRandomWriteError(
|
||||
thread->shared->GetSeed(), FLAGS_write_fault_one_in, error_msg,
|
||||
/*inject_for_all_file_types=*/false, types);
|
||||
}
|
||||
thread->stats.Start();
|
||||
for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
|
||||
if (thread->shared->HasVerificationFailedYet() ||
|
||||
|
@ -1004,8 +987,13 @@ void StressTest::OperateDb(ThreadState* thread) {
|
|||
if (total_size <= FLAGS_backup_max_size) {
|
||||
Status s = TestBackupRestore(thread, rand_column_families, rand_keys);
|
||||
if (!s.ok()) {
|
||||
VerificationAbort(shared, "Backup/restore gave inconsistent state",
|
||||
s);
|
||||
if (!s.IsIOError() || !std::strstr(s.getState(), "injected")) {
|
||||
VerificationAbort(shared,
|
||||
"Backup/restore gave inconsistent state", s);
|
||||
} else {
|
||||
fprintf(stdout, "Backup/restore failed: %s\n",
|
||||
s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1013,7 +1001,11 @@ void StressTest::OperateDb(ThreadState* thread) {
|
|||
if (thread->rand.OneInOpt(FLAGS_checkpoint_one_in)) {
|
||||
Status s = TestCheckpoint(thread, rand_column_families, rand_keys);
|
||||
if (!s.ok()) {
|
||||
VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
|
||||
if (!s.IsIOError() || !std::strstr(s.getState(), "injected")) {
|
||||
VerificationAbort(shared, "Checkpoint gave inconsistent state", s);
|
||||
} else {
|
||||
fprintf(stdout, "Checkpoint failed: %s\n", s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2699,6 +2691,9 @@ void StressTest::Open(SharedState* shared, bool reopen) {
|
|||
FLAGS_db, options_.db_paths, cf_descriptors, db_stress_listener_env));
|
||||
RegisterAdditionalListeners();
|
||||
|
||||
// If this is for DB reopen, write error injection may have been enabled.
|
||||
// Disable it here in case there is no open fault injection.
|
||||
fault_fs_guard->DisableWriteErrorInjection();
|
||||
if (!FLAGS_use_txn) {
|
||||
// Determine whether we need to inject file metadata write failures
|
||||
// during DB reopen. If it does, enable it.
|
||||
|
@ -2718,7 +2713,7 @@ void StressTest::Open(SharedState* shared, bool reopen) {
|
|||
// WAL is durable. Buffering unsynced writes will cause false
|
||||
// positive in crash tests. Before we figure out a way to
|
||||
// solve it, skip WAL from failure injection.
|
||||
fault_fs_guard->SetSkipDirectWritableTypes({kWalFile});
|
||||
fault_fs_guard->SetDirectWritableTypes({kWalFile});
|
||||
}
|
||||
inject_meta_error = FLAGS_open_metadata_write_fault_one_in;
|
||||
inject_write_error = FLAGS_open_write_fault_one_in;
|
||||
|
@ -2733,7 +2728,7 @@ void StressTest::Open(SharedState* shared, bool reopen) {
|
|||
fault_fs_guard->EnableWriteErrorInjection();
|
||||
fault_fs_guard->SetRandomWriteError(
|
||||
static_cast<uint32_t>(FLAGS_seed), FLAGS_open_write_fault_one_in,
|
||||
IOStatus::IOError("Injected Open Error"),
|
||||
IOStatus::IOError("Injected Open Write Error"),
|
||||
/*inject_for_all_file_types=*/true, /*types=*/{});
|
||||
}
|
||||
if (inject_read_error) {
|
||||
|
@ -2769,10 +2764,12 @@ void StressTest::Open(SharedState* shared, bool reopen) {
|
|||
}
|
||||
|
||||
if (inject_meta_error || inject_write_error || inject_read_error) {
|
||||
// TODO: re-enable write error injection after reopen. Same for
|
||||
// sync fault injection.
|
||||
fault_fs_guard->SetFilesystemDirectWritable(true);
|
||||
fault_fs_guard->DisableMetadataWriteErrorInjection();
|
||||
fault_fs_guard->DisableWriteErrorInjection();
|
||||
fault_fs_guard->SetSkipDirectWritableTypes({});
|
||||
fault_fs_guard->SetDirectWritableTypes({});
|
||||
fault_fs_guard->SetRandomReadError(0);
|
||||
if (s.ok()) {
|
||||
// Injected errors might happen in background compactions. We
|
||||
|
|
|
@ -88,11 +88,6 @@ int db_stress_tool(int argc, char** argv) {
|
|||
FaultInjectionTestFS* fs =
|
||||
new FaultInjectionTestFS(raw_env->GetFileSystem());
|
||||
fault_fs_guard.reset(fs);
|
||||
if (FLAGS_write_fault_one_in) {
|
||||
fault_fs_guard->SetFilesystemDirectWritable(false);
|
||||
} else {
|
||||
fault_fs_guard->SetFilesystemDirectWritable(true);
|
||||
}
|
||||
fault_env_guard =
|
||||
std::make_shared<CompositeEnvWrapper>(raw_env, fault_fs_guard);
|
||||
raw_env = fault_env_guard.get();
|
||||
|
|
|
@ -1568,11 +1568,13 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
}
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
|
||||
thread->shared->SafeTerminate();
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < pending_expected_values.size(); ++i) {
|
||||
pending_expected_values[i].Commit();
|
||||
if (!s.IsIOError() || !std::strstr(s.getState(), "injected")) {
|
||||
thread->shared->SafeTerminate();
|
||||
}
|
||||
} else {
|
||||
for (size_t i = 0; i < pending_expected_values.size(); ++i) {
|
||||
pending_expected_values[i].Commit();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -179,6 +179,7 @@ default_params = {
|
|||
"max_key_len": 3,
|
||||
"key_len_percent_dist": "1,30,69",
|
||||
"read_fault_one_in": lambda: random.choice([0, 32, 1000]),
|
||||
"write_fault_one_in": lambda: random.choice([0, 500]),
|
||||
"open_metadata_write_fault_one_in": lambda: random.choice([0, 0, 8]),
|
||||
"open_write_fault_one_in": lambda: random.choice([0, 0, 16]),
|
||||
"open_read_fault_one_in": lambda: random.choice([0, 0, 32]),
|
||||
|
@ -374,6 +375,10 @@ cf_consistency_params = {
|
|||
# use small value for write_buffer_size so that RocksDB triggers flush
|
||||
# more frequently
|
||||
"write_buffer_size": 1024 * 1024,
|
||||
# Small write buffer size with more frequent flush has a higher chance
|
||||
# of hitting write error. DB may be stopped if memtable fills up during
|
||||
# auto resume.
|
||||
"write_fault_one_in": 0,
|
||||
"enable_pipelined_write": lambda: random.randint(0, 1),
|
||||
# Snapshots are used heavily in this test mode, while they are incompatible
|
||||
# with compaction filter.
|
||||
|
@ -506,6 +511,9 @@ multiops_txn_default_params = {
|
|||
"enable_compaction_filter": 0,
|
||||
"create_timestamped_snapshot_one_in": 50,
|
||||
"sync_fault_injection": 0,
|
||||
# This test has aggressive flush frequency and small write buffer size.
|
||||
# Disabling write fault to avoid writes being stopped.
|
||||
"write_fault_one_in": 0,
|
||||
# PutEntity in transactions is not yet implemented
|
||||
"use_put_entity_one_in": 0,
|
||||
"use_get_entity": 0,
|
||||
|
@ -671,7 +679,9 @@ def finalize_and_sanitize(src_params):
|
|||
dest_params["use_full_merge_v1"] = 0
|
||||
if dest_params["file_checksum_impl"] == "none":
|
||||
dest_params["verify_file_checksums_one_in"] = 0
|
||||
|
||||
if dest_params["write_fault_one_in"] > 0:
|
||||
# background work may be disabled while DB is resuming after some error
|
||||
dest_params["max_write_buffer_number"] = max(dest_params["max_write_buffer_number"], 6)
|
||||
return dest_params
|
||||
|
||||
|
||||
|
|
|
@ -408,7 +408,7 @@ IOStatus TestFSRandomAccessFile::Read(uint64_t offset, size_t n,
|
|||
scratch, /*need_count_increase=*/true, /*fault_injected=*/nullptr);
|
||||
}
|
||||
if (s.ok() && fs_->ShouldInjectRandomReadError()) {
|
||||
return IOStatus::IOError("Injected read error");
|
||||
return IOStatus::IOError("injected read error");
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -430,7 +430,7 @@ IOStatus TestFSRandomAccessFile::ReadAsync(
|
|||
}
|
||||
if (ret.ok()) {
|
||||
if (fs_->ShouldInjectRandomReadError()) {
|
||||
ret = IOStatus::IOError("Injected read error");
|
||||
ret = IOStatus::IOError("injected read error");
|
||||
} else {
|
||||
s = target_->ReadAsync(req, opts, cb, cb_arg, io_handle, del_fn, nullptr);
|
||||
}
|
||||
|
@ -470,7 +470,7 @@ IOStatus TestFSRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs,
|
|||
/*fault_injected=*/nullptr);
|
||||
}
|
||||
if (s.ok() && fs_->ShouldInjectRandomReadError()) {
|
||||
return IOStatus::IOError("Injected read error");
|
||||
return IOStatus::IOError("injected read error");
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -487,7 +487,7 @@ IOStatus TestFSSequentialFile::Read(size_t n, const IOOptions& options,
|
|||
IODebugContext* dbg) {
|
||||
IOStatus s = target()->Read(n, options, result, scratch, dbg);
|
||||
if (s.ok() && fs_->ShouldInjectRandomReadError()) {
|
||||
return IOStatus::IOError("Injected seq read error");
|
||||
return IOStatus::IOError("injected seq read error");
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -499,7 +499,7 @@ IOStatus TestFSSequentialFile::PositionedRead(uint64_t offset, size_t n,
|
|||
IOStatus s =
|
||||
target()->PositionedRead(offset, n, options, result, scratch, dbg);
|
||||
if (s.ok() && fs_->ShouldInjectRandomReadError()) {
|
||||
return IOStatus::IOError("Injected seq positioned read error");
|
||||
return IOStatus::IOError("injected seq positioned read error");
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -678,7 +678,7 @@ IOStatus FaultInjectionTestFS::NewRandomAccessFile(
|
|||
return GetError();
|
||||
}
|
||||
if (ShouldInjectRandomReadError()) {
|
||||
return IOStatus::IOError("Injected error when open random access file");
|
||||
return IOStatus::IOError("injected error when open random access file");
|
||||
}
|
||||
IOStatus io_s = InjectThreadSpecificReadError(ErrorOperation::kOpen, nullptr,
|
||||
false, nullptr,
|
||||
|
@ -701,7 +701,7 @@ IOStatus FaultInjectionTestFS::NewSequentialFile(
|
|||
}
|
||||
|
||||
if (ShouldInjectRandomReadError()) {
|
||||
return IOStatus::IOError("Injected read error when creating seq file");
|
||||
return IOStatus::IOError("injected read error when creating seq file");
|
||||
}
|
||||
IOStatus io_s = target()->NewSequentialFile(fname, file_opts, result, dbg);
|
||||
if (io_s.ok()) {
|
||||
|
@ -971,15 +971,15 @@ IOStatus FaultInjectionTestFS::InjectThreadSpecificReadError(
|
|||
|
||||
if (op != ErrorOperation::kMultiReadSingleReq) {
|
||||
// Likely non-per read status code for MultiRead
|
||||
ctx->message += "error; ";
|
||||
ctx->message += "injected read error; ";
|
||||
ret_fault_injected = true;
|
||||
ret = IOStatus::IOError();
|
||||
ret = IOStatus::IOError(ctx->message);
|
||||
} else if (Random::GetTLSInstance()->OneIn(8)) {
|
||||
assert(result);
|
||||
// For a small chance, set the failure to status but turn the
|
||||
// result to be empty, which is supposed to be caught for a check.
|
||||
*result = Slice();
|
||||
ctx->message += "inject empty result; ";
|
||||
ctx->message += "injected empty result; ";
|
||||
ret_fault_injected = true;
|
||||
} else if (!direct_io && Random::GetTLSInstance()->OneIn(7) &&
|
||||
scratch != nullptr && result->data() == scratch) {
|
||||
|
@ -996,12 +996,12 @@ IOStatus FaultInjectionTestFS::InjectThreadSpecificReadError(
|
|||
// It would work for CRC. Not 100% sure for xxhash and will adjust
|
||||
// if it is not the case.
|
||||
const_cast<char*>(result->data())[result->size() - 1]++;
|
||||
ctx->message += "corrupt last byte; ";
|
||||
ctx->message += "injected corrupt last byte; ";
|
||||
ret_fault_injected = true;
|
||||
} else {
|
||||
ctx->message += "error result multiget single; ";
|
||||
ctx->message += "injected error result multiget single; ";
|
||||
ret_fault_injected = true;
|
||||
ret = IOStatus::IOError();
|
||||
ret = IOStatus::IOError(ctx->message);
|
||||
}
|
||||
}
|
||||
if (ctx->retryable) {
|
||||
|
@ -1056,7 +1056,7 @@ IOStatus FaultInjectionTestFS::InjectMetadataWriteError() {
|
|||
}
|
||||
}
|
||||
TEST_SYNC_POINT("FaultInjectionTestFS::InjectMetadataWriteError:Injected");
|
||||
return IOStatus::IOError();
|
||||
return IOStatus::IOError("injected metadata write error");
|
||||
}
|
||||
|
||||
void FaultInjectionTestFS::PrintFaultBacktrace() {
|
||||
|
|
|
@ -323,8 +323,8 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
if (!TryParseFileName(file_name, &file_number, &file_type)) {
|
||||
return false;
|
||||
}
|
||||
return skip_direct_writable_types_.find(file_type) !=
|
||||
skip_direct_writable_types_.end();
|
||||
return direct_writable_types_.find(file_type) !=
|
||||
direct_writable_types_.end();
|
||||
}
|
||||
void SetFilesystemActiveNoLock(
|
||||
bool active, IOStatus error = IOStatus::Corruption("Not active")) {
|
||||
|
@ -439,9 +439,9 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
write_error_allowed_types_ = types;
|
||||
}
|
||||
|
||||
void SetSkipDirectWritableTypes(const std::set<FileType>& types) {
|
||||
void SetDirectWritableTypes(const std::set<FileType>& types) {
|
||||
MutexLock l(&mutex_);
|
||||
skip_direct_writable_types_ = types;
|
||||
direct_writable_types_ = types;
|
||||
}
|
||||
|
||||
void SetRandomMetadataWriteError(int one_in) {
|
||||
|
@ -583,7 +583,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
|
|||
bool inject_for_all_file_types_;
|
||||
std::vector<FileType> write_error_allowed_types_;
|
||||
// File types where direct writable is skipped.
|
||||
std::set<FileType> skip_direct_writable_types_;
|
||||
std::set<FileType> direct_writable_types_;
|
||||
bool ingest_data_corruption_before_write_;
|
||||
ChecksumType checksum_handoff_func_tpye_;
|
||||
bool fail_get_file_unique_id_;
|
||||
|
|
Loading…
Reference in New Issue