mirror of https://github.com/facebook/rocksdb.git
Fix false-positive TestBackupRestore corruption (#12917)
Summary:
**Context:**
https://github.com/facebook/rocksdb/pull/12838 allows a write thread encountered certain injected error to release the lock and sleep before retrying write in order to reduce performance cost. This requires adding checks like [this](b26b395e0a/db_stress_tool/expected_value.cc (L29-L31)
) to prevent writing to the same key from another thread.
The added check causes a false-positive failure when delete range + file ingestion + backup is used. Consider the following scenario:
(1) Issue a delete range covering some key that do not exist and a key does exist (named as k1). k1 will have "pending delete" state while the keys that does not exit will have whatever state they already have since we don't delete a key that does not exist already.
(2) After https://github.com/facebook/rocksdb/pull/12838, `PrepareDeleteRange(... &prepared)` will return `prepared = false`. So below logic will be executed and k1's "pending delete" won't get roll-backed nor committed.
```
std::vector<PendingExpectedValue> pending_expected_values =
shared->PrepareDeleteRange(rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width,
&prepared);
if (!prepared) {
for (PendingExpectedValue& pending_expected_value :
pending_expected_values) {
pending_expected_value.PermitUnclosedPendingState();
}
return s;
}
```
(3) Issue an file ingestion covering k1 and another key k2. Similar to (2), we will have `shared->PreparePut(column_family, key, &prepared)` return `prepared = false` for k1 while k2 will have a "pending put" state. So below logic will be executed and k2's "pending put" state won't get roll-backed nor committed.
```
for (int64_t key = key_base;
s.ok() && key < shared->GetMaxKey() &&
static_cast<int32_t>(keys.size()) < FLAGS_ingest_external_file_width;
++key)
PendingExpectedValue pending_expected_value =
shared->PreparePut(column_family, key, &prepared);
if (!prepared) {
pending_expected_value.PermitUnclosedPendingState();
for (PendingExpectedValue& pev : pending_expected_values) {
pev.PermitUnclosedPendingState();
}
return;
}
}
```
(4) Issue a backup and verify on k2. Below logic decides that k2 should exist in restored DB since it has a pending write state while k2 is never ingested into the original DB as (3) returns early.
```
bool Exists() const { return PendingPut() || !IsDeleted(); }
TestBackupRestore() {
...
Status get_status = restored_db->Get(
read_opts, restored_cf_handles[rand_column_families[i]], key,
&restored_value);
bool exists = thread->shared->Exists(rand_column_families[i], rand_keys[0]);
if (get_status.ok()) {
if (!exists && from_latest && ShouldAcquireMutexOnKey()) {
std::ostringstream oss;
oss << "0x" << key.ToString(true)
<< " exists in restore but not in original db";
s = Status::Corruption(oss.str());
}
} else if (get_status.IsNotFound()) {
if (exists && from_latest && ShouldAcquireMutexOnKey()) {
std::ostringstream oss;
oss << "0x" << key.ToString(true)
<< " exists in original db but not in restore";
s = Status::Corruption(oss.str());
}
}
...
}
```
So we see false-positive corruption like `Failure in a backup/restore operation with: Corruption: 0x000000000000017B0000000000000073787878 exists in original db but not in restore`
A simple fix is to remove `PendingPut()` from `bool Exists() ` since it's called under a lock and should never see a pending write. However, in order for "under a lock and should never see a pending write" to be true, we need to remove the logic of releasing the lock during sleep in the write thread, which expose pending write to other thread that can call Exists() like back up thread.
The downside of holding lock during sleep is blocking other write thread of the same key to proceed cuz they need to wait for the lock. This should happen rarely as the key of a thread is selected randomly in crash test like below.
```
void StressTest::OperateDb(ThreadState* thread) {
for (uint64_t i = 0; i < ops_per_open; i++) {
...
int64_t rand_key = GenerateOneKey(thread, i);
...
}
}
```
**Summary:**
- Removed the "lock release" part and related checks
- Printed recovery time if the write thread waited more than 10 seconds
- Reverted regression in testing coverage when deleting a non-existent key
Pull Request resolved: https://github.com/facebook/rocksdb/pull/12917
Test Plan:
Below command repro-ed frequently before the fix and not after.
```
./db_stress --WAL_size_limit_MB=1 --WAL_ttl_seconds=60 --acquire_snapshot_one_in=0 --adaptive_readahead=0 --adm_policy=1 --advise_random_on_open=1 --allow_concurrent_memtable_write=0 --allow_data_in_errors=True --allow_fallocate=0 --allow_setting_blob_options_dynamically=1 --async_io=0 --auto_readahead_size=1 --avoid_flush_during_recovery=0 --avoid_flush_during_shutdown=0 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=0 --bgerror_resume_retry_interval=100 --blob_cache_size=8388608 --blob_compaction_readahead_size=1048576 --blob_compression_type=none --blob_file_size=1073741824 --blob_file_starting_level=1 --blob_garbage_collection_age_cutoff=0.0 --blob_garbage_collection_force_threshold=0.75 --block_align=0 --block_protection_bytes_per_key=8 --block_size=16384 --bloom_before_level=2147483647 --bloom_bits=16.216959977115277 --bottommost_compression_type=xpress --bottommost_file_compaction_delay=600 --bytes_per_sync=262144 --cache_index_and_filter_blocks=1 --cache_index_and_filter_blocks_with_high_priority=1 --cache_size=8388608 --cache_type=lru_cache --charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=0 --charge_table_reader=1 --check_multiget_consistency=0 --check_multiget_entity_consistency=0 --checkpoint_one_in=1000000 --checksum_type=kXXH3 --clear_column_family_one_in=0 --column_families=1 --compact_files_one_in=1000 --compact_range_one_in=0 --compaction_pri=3 --compaction_readahead_size=0 --compaction_ttl=10 --compress_format_version=2 --compressed_secondary_cache_size=8388608 --compression_checksum=0 --compression_max_dict_buffer_bytes=2097151 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=zlib --compression_use_zstd_dict_trainer=0 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --daily_offpeak_time_utc=04:00-08:00 --data_block_index_type=0 --db=/dev/shm/rocksdb_test/rocksdb_crashtest_whitebox --db_write_buffer_size=0 --default_temperature=kUnknown --default_write_temperature=kWarm --delete_obsolete_files_period_micros=21600000000 --delpercent=0 --delrangepercent=5 --destroy_db_initially=0 --detect_filter_construct_corruption=1 --disable_file_deletions_one_in=10000 --disable_manual_compaction_one_in=1000000 --disable_wal=0 --dump_malloc_stats=0 --enable_blob_files=0 --enable_blob_garbage_collection=1 --enable_checksum_handoff=1 --enable_compaction_filter=1 --enable_custom_split_merge=1 --enable_do_not_compress_roles=0 --enable_index_compression=1 --enable_memtable_insert_with_hint_prefix_extractor=0 --enable_pipelined_write=1 --enable_sst_partitioner_factory=1 --enable_thread_tracking=0 --enable_write_thread_adaptive_yield=0 --error_recovery_with_no_fault_injection=1 --exclude_wal_from_write_fault_injection=1 --expected_values_dir=/dev/shm/rocksdb_test/rocksdb_crashtest_expected --fail_if_options_file_error=0 --fifo_allow_compaction=1 --file_checksum_impl=big --fill_cache=1 --flush_one_in=1000000 --format_version=2 --get_all_column_family_metadata_one_in=10000 --get_current_wal_file_one_in=0 --get_live_files_apis_one_in=1000000 --get_properties_of_all_tables_one_in=100000 --get_property_one_in=100000 --get_sorted_wal_files_one_in=0 --hard_pending_compaction_bytes_limit=2097152 --high_pri_pool_ratio=0.5 --index_block_restart_interval=1 --index_shortening=2 --index_type=0 --ingest_external_file_one_in=1000 --initial_auto_readahead_size=0 --inplace_update_support=0 --iterpercent=0 --key_len_percent_dist=1,30,69 --key_may_exist_one_in=100 --last_level_temperature=kUnknown --level_compaction_dynamic_level_bytes=0 --lock_wal_one_in=10000 --log2_keys_per_lock=10 --log_file_time_to_roll=0 --log_readahead_size=0 --long_running_snapshots=1 --low_pri_pool_ratio=0.5 --lowest_used_cache_tier=1 --manifest_preallocation_size=0 --manual_wal_flush_one_in=0 --mark_for_compaction_one_file_in=10 --max_auto_readahead_size=16384 --max_background_compactions=1 --max_bytes_for_level_base=67108864 --max_key=100000 --max_key_len=3 --max_log_file_size=1048576 --max_manifest_file_size=1073741824 --max_sequential_skip_in_iterations=16 --max_total_wal_size=0 --max_write_batch_group_size_bytes=16 --max_write_buffer_number=10 --max_write_buffer_size_to_maintain=8388608 --memtable_insert_hint_per_batch=1 --memtable_max_range_deletions=1000 --memtable_prefix_bloom_size_ratio=0.001 --memtable_protection_bytes_per_key=4 --memtable_whole_key_filtering=1 --memtablerep=skip_list --metadata_charge_policy=1 --metadata_read_fault_one_in=0 --metadata_write_fault_one_in=0 --min_blob_size=16 --min_write_buffer_number_to_merge=2 --mmap_read=0 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --open_files=-1 --open_metadata_read_fault_one_in=0 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=20000000 --optimize_filters_for_hits=1 --optimize_filters_for_memory=0 --optimize_multiget_for_io=1 --paranoid_file_checks=1 --partition_filters=0 --partition_pinning=1 --pause_background_one_in=10000 --periodic_compaction_seconds=10 --prefix_size=8 --prefixpercent=0 --prepopulate_blob_cache=1 --prepopulate_block_cache=1 --preserve_internal_time_seconds=0 --progress_reports=0 --promote_l0_one_in=0 --read_amp_bytes_per_bit=0 --read_fault_one_in=0 --readahead_size=524288 --readpercent=60 --recycle_log_file_num=1 --reopen=20 --report_bg_io_stats=0 --reset_stats_one_in=1000000 --sample_for_compression=5 --secondary_cache_fault_one_in=0 --secondary_cache_uri= --skip_stats_update_on_db_open=1 --snapshot_hold_ops=100000 --soft_pending_compaction_bytes_limit=68719476736 --sqfc_name=foo --sqfc_version=1 --sst_file_manager_bytes_per_sec=0 --sst_file_manager_bytes_per_truncate=0 --stats_dump_period_sec=10 --stats_history_buffer_size=1048576 --strict_bytes_per_sync=1 --subcompactions=2 --sync=0 --sync_fault_injection=0 --table_cache_numshardbits=0 --target_file_size_base=16777216 --target_file_size_multiplier=1 --test_batches_snapshots=0 --top_level_index_pinning=3 --uncache_aggressiveness=118 --universal_max_read_amp=-1 --unpartitioned_pinning=0 --use_adaptive_mutex=0 --use_adaptive_mutex_lru=1 --use_attribute_group=0 --use_blob_cache=0 --use_delta_encoding=1 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_get_entity=0 --use_merge=0 --use_multi_cf_iterator=0 --use_multi_get_entity=0 --use_multiget=1 --use_put_entity_one_in=0 --use_shared_block_and_blob_cache=1 --use_sqfc_for_range_queries=1 --use_timed_put_one_in=0 --use_write_buffer_manager=0 --user_timestamp_size=0 --value_size_mult=32 --verification_only=0 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_compression=0 --verify_db_one_in=10000 --verify_file_checksums_one_in=1000000 --verify_iterator_with_expected_state_one_in=5 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=none --write_buffer_size=33554432 --write_dbid_to_manifest=0 --write_fault_one_in=0 --writepercent=35
```
Reviewed By: cbi42
Differential Revision: D60890580
Pulled By: hx235
fbshipit-source-id: 401f90d6d351c7ee11088cad06fb00e54062d416
This commit is contained in:
parent
16c21afc06
commit
112bf15dca
|
@ -262,14 +262,10 @@ class SharedState {
|
|||
// This is useful for crash-recovery testing when the process may crash
|
||||
// before updating the corresponding expected value
|
||||
//
|
||||
// It can fail and `*prepared` will be set to false if the previous write or
|
||||
// delete is still in pending state (e.g, still in recovery for retryable IO
|
||||
// errors). If succeeds,`*prepared` will be set to true
|
||||
//
|
||||
// Requires external locking covering `key` in `cf` to prevent
|
||||
// concurrent write or delete to the same `key`.
|
||||
PendingExpectedValue PreparePut(int cf, int64_t key, bool* prepared) {
|
||||
return expected_state_manager_->PreparePut(cf, key, prepared);
|
||||
PendingExpectedValue PreparePut(int cf, int64_t key) {
|
||||
return expected_state_manager_->PreparePut(cf, key);
|
||||
}
|
||||
|
||||
// Does not requires external locking.
|
||||
|
@ -281,31 +277,24 @@ class SharedState {
|
|||
// This is useful for crash-recovery testing when the process may crash
|
||||
// before updating the corresponding expected value
|
||||
//
|
||||
// It can fail and `*prepared` will be set to false if the previous write or
|
||||
// delete is still in pending state (e.g, still in recovery for retryable IO
|
||||
// errors). If succeeds,`*prepared` will be set to true
|
||||
//
|
||||
// Requires external locking covering `key` in `cf` to prevent concurrent
|
||||
// write or delete to the same `key`.
|
||||
PendingExpectedValue PrepareDelete(int cf, int64_t key, bool* prepared) {
|
||||
return expected_state_manager_->PrepareDelete(cf, key, prepared);
|
||||
PendingExpectedValue PrepareDelete(int cf, int64_t key) {
|
||||
return expected_state_manager_->PrepareDelete(cf, key);
|
||||
}
|
||||
|
||||
// Requires external locking covering `key` in `cf` to prevent concurrent
|
||||
// write or delete to the same `key`.
|
||||
PendingExpectedValue PrepareSingleDelete(int cf, int64_t key,
|
||||
bool* prepared) {
|
||||
return expected_state_manager_->PrepareSingleDelete(cf, key, prepared);
|
||||
PendingExpectedValue PrepareSingleDelete(int cf, int64_t key) {
|
||||
return expected_state_manager_->PrepareSingleDelete(cf, key);
|
||||
}
|
||||
|
||||
// Requires external locking covering keys in `[begin_key, end_key)` in `cf`
|
||||
// to prevent concurrent write or delete to the same `key`.
|
||||
std::vector<PendingExpectedValue> PrepareDeleteRange(int cf,
|
||||
int64_t begin_key,
|
||||
int64_t end_key,
|
||||
bool* prepared) {
|
||||
return expected_state_manager_->PrepareDeleteRange(cf, begin_key, end_key,
|
||||
prepared);
|
||||
int64_t end_key) {
|
||||
return expected_state_manager_->PrepareDeleteRange(cf, begin_key, end_key);
|
||||
}
|
||||
|
||||
bool AllowsOverwrite(int64_t key) const {
|
||||
|
|
|
@ -632,10 +632,8 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
|
|||
for (auto cfh : column_families_) {
|
||||
for (int64_t k = 0; k != number_of_keys; ++k) {
|
||||
const std::string key = Key(k);
|
||||
bool prepare = false;
|
||||
PendingExpectedValue pending_expected_value =
|
||||
shared->PreparePut(cf_idx, k, &prepare);
|
||||
assert(prepare);
|
||||
shared->PreparePut(cf_idx, k);
|
||||
const uint32_t value_base = pending_expected_value.GetFinalValueBase();
|
||||
const size_t sz = GenerateValue(value_base, value, sizeof(value));
|
||||
|
||||
|
|
|
@ -64,6 +64,42 @@ class StressTest {
|
|||
}
|
||||
}
|
||||
|
||||
void UpdateIfInitialWriteFails(Env* db_stress_env, const Status& write_s,
|
||||
Status* initial_write_s,
|
||||
bool* initial_wal_write_may_succeed,
|
||||
uint64_t* wait_for_recover_start_time) {
|
||||
assert(db_stress_env && initial_write_s && initial_wal_write_may_succeed &&
|
||||
wait_for_recover_start_time);
|
||||
// Only update `initial_write_s`, `initial_wal_write_may_succeed` when the
|
||||
// first write fails
|
||||
if (!write_s.ok() && (*initial_write_s).ok()) {
|
||||
*initial_write_s = write_s;
|
||||
*initial_wal_write_may_succeed =
|
||||
!FaultInjectionTestFS::IsFailedToWriteToWALError(*initial_write_s);
|
||||
*wait_for_recover_start_time = db_stress_env->NowMicros();
|
||||
}
|
||||
}
|
||||
|
||||
void PrintWriteRecoveryWaitTimeIfNeeded(Env* db_stress_env,
|
||||
const Status& initial_write_s,
|
||||
bool initial_wal_write_may_succeed,
|
||||
uint64_t wait_for_recover_start_time,
|
||||
const std::string& thread_name) {
|
||||
assert(db_stress_env);
|
||||
bool waited_for_recovery = !initial_write_s.ok() &&
|
||||
IsErrorInjectedAndRetryable(initial_write_s) &&
|
||||
initial_wal_write_may_succeed;
|
||||
if (waited_for_recovery) {
|
||||
uint64_t elapsed_sec =
|
||||
(db_stress_env->NowMicros() - wait_for_recover_start_time) / 1000000;
|
||||
if (elapsed_sec > 10) {
|
||||
fprintf(stdout,
|
||||
"%s thread slept to wait for write recovery for "
|
||||
"%" PRIu64 " seconds\n",
|
||||
thread_name.c_str(), elapsed_sec);
|
||||
}
|
||||
}
|
||||
}
|
||||
void GetDeleteRangeKeyLocks(
|
||||
ThreadState* thread, int rand_column_family, int64_t rand_key,
|
||||
std::vector<std::unique_ptr<MutexLock>>* range_locks) {
|
||||
|
|
|
@ -32,41 +32,29 @@ void ExpectedState::Precommit(int cf, int64_t key, const ExpectedValue& value) {
|
|||
std::atomic_thread_fence(std::memory_order_release);
|
||||
}
|
||||
|
||||
PendingExpectedValue ExpectedState::PreparePut(int cf, int64_t key,
|
||||
bool* prepared) {
|
||||
assert(prepared);
|
||||
PendingExpectedValue ExpectedState::PreparePut(int cf, int64_t key) {
|
||||
ExpectedValue expected_value = Load(cf, key);
|
||||
|
||||
// Calculate the original expected value
|
||||
const ExpectedValue orig_expected_value = expected_value;
|
||||
|
||||
// Calculate the pending expected value
|
||||
bool res = expected_value.Put(true /* pending */);
|
||||
if (!res) {
|
||||
PendingExpectedValue ret = PendingExpectedValue(
|
||||
&Value(cf, key), orig_expected_value, orig_expected_value);
|
||||
*prepared = false;
|
||||
return ret;
|
||||
}
|
||||
expected_value.Put(true /* pending */);
|
||||
const ExpectedValue pending_expected_value = expected_value;
|
||||
|
||||
// Calculate the final expected value
|
||||
res = expected_value.Put(false /* pending */);
|
||||
assert(res);
|
||||
expected_value.Put(false /* pending */);
|
||||
const ExpectedValue final_expected_value = expected_value;
|
||||
|
||||
// Precommit
|
||||
Precommit(cf, key, pending_expected_value);
|
||||
*prepared = true;
|
||||
return PendingExpectedValue(&Value(cf, key), orig_expected_value,
|
||||
final_expected_value);
|
||||
}
|
||||
|
||||
ExpectedValue ExpectedState::Get(int cf, int64_t key) { return Load(cf, key); }
|
||||
|
||||
PendingExpectedValue ExpectedState::PrepareDelete(int cf, int64_t key,
|
||||
bool* prepared) {
|
||||
assert(prepared);
|
||||
PendingExpectedValue ExpectedState::PrepareDelete(int cf, int64_t key) {
|
||||
ExpectedValue expected_value = Load(cf, key);
|
||||
|
||||
// Calculate the original expected value
|
||||
|
@ -77,47 +65,32 @@ PendingExpectedValue ExpectedState::PrepareDelete(int cf, int64_t key,
|
|||
if (!res) {
|
||||
PendingExpectedValue ret = PendingExpectedValue(
|
||||
&Value(cf, key), orig_expected_value, orig_expected_value);
|
||||
*prepared = false;
|
||||
return ret;
|
||||
}
|
||||
const ExpectedValue pending_expected_value = expected_value;
|
||||
|
||||
// Calculate the final expected value
|
||||
res = expected_value.Delete(false /* pending */);
|
||||
assert(res);
|
||||
expected_value.Delete(false /* pending */);
|
||||
const ExpectedValue final_expected_value = expected_value;
|
||||
|
||||
// Precommit
|
||||
Precommit(cf, key, pending_expected_value);
|
||||
*prepared = true;
|
||||
return PendingExpectedValue(&Value(cf, key), orig_expected_value,
|
||||
final_expected_value);
|
||||
}
|
||||
|
||||
PendingExpectedValue ExpectedState::PrepareSingleDelete(int cf, int64_t key,
|
||||
bool* prepared) {
|
||||
return PrepareDelete(cf, key, prepared);
|
||||
PendingExpectedValue ExpectedState::PrepareSingleDelete(int cf, int64_t key) {
|
||||
return PrepareDelete(cf, key);
|
||||
}
|
||||
|
||||
std::vector<PendingExpectedValue> ExpectedState::PrepareDeleteRange(
|
||||
int cf, int64_t begin_key, int64_t end_key, bool* prepared) {
|
||||
int cf, int64_t begin_key, int64_t end_key) {
|
||||
std::vector<PendingExpectedValue> pending_expected_values;
|
||||
bool has_prepared_failed = false;
|
||||
|
||||
for (int64_t key = begin_key; key < end_key; ++key) {
|
||||
bool each_prepared = false;
|
||||
PendingExpectedValue pending_expected_value =
|
||||
PrepareDelete(cf, key, &each_prepared);
|
||||
if (each_prepared) {
|
||||
pending_expected_values.push_back(pending_expected_value);
|
||||
} else {
|
||||
has_prepared_failed = true;
|
||||
pending_expected_value.PermitUnclosedPendingState();
|
||||
break;
|
||||
}
|
||||
pending_expected_values.push_back(PrepareDelete(cf, key));
|
||||
}
|
||||
|
||||
*prepared = !has_prepared_failed;
|
||||
return pending_expected_values;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ class ExpectedState {
|
|||
//
|
||||
// Requires external locking covering `key` in `cf` to prevent concurrent
|
||||
// write or delete to the same `key`.
|
||||
PendingExpectedValue PreparePut(int cf, int64_t key, bool* prepared);
|
||||
PendingExpectedValue PreparePut(int cf, int64_t key);
|
||||
|
||||
// Does not requires external locking.
|
||||
ExpectedValue Get(int cf, int64_t key);
|
||||
|
@ -55,18 +55,17 @@ class ExpectedState {
|
|||
//
|
||||
// Requires external locking covering `key` in `cf` to prevent concurrent
|
||||
// write or delete to the same `key`.
|
||||
PendingExpectedValue PrepareDelete(int cf, int64_t key, bool* prepared);
|
||||
PendingExpectedValue PrepareDelete(int cf, int64_t key);
|
||||
|
||||
// Requires external locking covering `key` in `cf` to prevent concurrent
|
||||
// write or delete to the same `key`.
|
||||
PendingExpectedValue PrepareSingleDelete(int cf, int64_t key, bool* prepared);
|
||||
PendingExpectedValue PrepareSingleDelete(int cf, int64_t key);
|
||||
|
||||
// Requires external locking covering keys in `[begin_key, end_key)` in `cf`
|
||||
// to prevent concurrent write or delete to the same `key`.
|
||||
std::vector<PendingExpectedValue> PrepareDeleteRange(int cf,
|
||||
int64_t begin_key,
|
||||
int64_t end_key,
|
||||
bool* prepared);
|
||||
int64_t end_key);
|
||||
|
||||
// Update the expected value for start of an incomplete write or delete
|
||||
// operation on the key assoicated with this expected value
|
||||
|
@ -197,30 +196,28 @@ class ExpectedStateManager {
|
|||
void ClearColumnFamily(int cf) { return latest_->ClearColumnFamily(cf); }
|
||||
|
||||
// See ExpectedState::PreparePut()
|
||||
PendingExpectedValue PreparePut(int cf, int64_t key, bool* prepared) {
|
||||
return latest_->PreparePut(cf, key, prepared);
|
||||
PendingExpectedValue PreparePut(int cf, int64_t key) {
|
||||
return latest_->PreparePut(cf, key);
|
||||
}
|
||||
|
||||
// See ExpectedState::Get()
|
||||
ExpectedValue Get(int cf, int64_t key) { return latest_->Get(cf, key); }
|
||||
|
||||
// See ExpectedState::PrepareDelete()
|
||||
PendingExpectedValue PrepareDelete(int cf, int64_t key, bool* prepared) {
|
||||
return latest_->PrepareDelete(cf, key, prepared);
|
||||
PendingExpectedValue PrepareDelete(int cf, int64_t key) {
|
||||
return latest_->PrepareDelete(cf, key);
|
||||
}
|
||||
|
||||
// See ExpectedState::PrepareSingleDelete()
|
||||
PendingExpectedValue PrepareSingleDelete(int cf, int64_t key,
|
||||
bool* prepared) {
|
||||
return latest_->PrepareSingleDelete(cf, key, prepared);
|
||||
PendingExpectedValue PrepareSingleDelete(int cf, int64_t key) {
|
||||
return latest_->PrepareSingleDelete(cf, key);
|
||||
}
|
||||
|
||||
// See ExpectedState::PrepareDeleteRange()
|
||||
std::vector<PendingExpectedValue> PrepareDeleteRange(int cf,
|
||||
int64_t begin_key,
|
||||
int64_t end_key,
|
||||
bool* prepared) {
|
||||
return latest_->PrepareDeleteRange(cf, begin_key, end_key, prepared);
|
||||
int64_t end_key) {
|
||||
return latest_->PrepareDeleteRange(cf, begin_key, end_key);
|
||||
}
|
||||
|
||||
// See ExpectedState::Exists()
|
||||
|
|
|
@ -10,11 +10,7 @@
|
|||
#include <atomic>
|
||||
|
||||
namespace ROCKSDB_NAMESPACE {
|
||||
bool ExpectedValue::Put(bool pending) {
|
||||
if (pending && (PendingWrite() || PendingDelete())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void ExpectedValue::Put(bool pending) {
|
||||
if (pending) {
|
||||
SetPendingWrite();
|
||||
} else {
|
||||
|
@ -22,14 +18,9 @@ bool ExpectedValue::Put(bool pending) {
|
|||
ClearDeleted();
|
||||
ClearPendingWrite();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ExpectedValue::Delete(bool pending) {
|
||||
if (pending && (PendingWrite() || PendingDelete())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!Exists()) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -37,11 +37,14 @@ class ExpectedValue {
|
|||
explicit ExpectedValue(uint32_t expected_value)
|
||||
: expected_value_(expected_value) {}
|
||||
|
||||
bool Exists() const { return PendingWrite() || !IsDeleted(); }
|
||||
bool Exists() const {
|
||||
assert(!PendingWrite() && !PendingDelete());
|
||||
return !IsDeleted();
|
||||
}
|
||||
|
||||
uint32_t Read() const { return expected_value_; }
|
||||
|
||||
bool Put(bool pending);
|
||||
void Put(bool pending);
|
||||
|
||||
bool Delete(bool pending);
|
||||
|
||||
|
|
|
@ -1619,28 +1619,21 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
// write
|
||||
bool initial_wal_write_may_succeed = true;
|
||||
|
||||
bool prepared = false;
|
||||
PendingExpectedValue pending_expected_value =
|
||||
shared->PreparePut(rand_column_family, rand_key, &prepared);
|
||||
if (!prepared) {
|
||||
pending_expected_value.PermitUnclosedPendingState();
|
||||
return s;
|
||||
}
|
||||
shared->PreparePut(rand_column_family, rand_key);
|
||||
|
||||
const uint32_t value_base = pending_expected_value.GetFinalValueBase();
|
||||
const size_t sz = GenerateValue(value_base, value, sizeof(value));
|
||||
const Slice v(value, sz);
|
||||
|
||||
uint64_t wait_for_recover_start_time = 0;
|
||||
do {
|
||||
// In order to commit the expected state for the initial write failed with
|
||||
// injected retryable error and successful WAL write, retry the write
|
||||
// until it succeeds after the recovery finishes
|
||||
if (!s.ok() && IsErrorInjectedAndRetryable(s) &&
|
||||
initial_wal_write_may_succeed) {
|
||||
lock.reset();
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1 * 1000 * 1000));
|
||||
lock.reset(new MutexLock(
|
||||
shared->GetMutexForKey(rand_column_family, rand_key)));
|
||||
}
|
||||
if (FLAGS_use_put_entity_one_in > 0 &&
|
||||
(value_base % FLAGS_use_put_entity_one_in) == 0) {
|
||||
|
@ -1691,13 +1684,10 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
});
|
||||
}
|
||||
}
|
||||
// Only update `initial_write_s`, `initial_wal_write_may_succeed` when the
|
||||
// first write fails
|
||||
if (!s.ok() && initial_write_s.ok()) {
|
||||
initial_write_s = s;
|
||||
initial_wal_write_may_succeed =
|
||||
!FaultInjectionTestFS::IsFailedToWriteToWALError(initial_write_s);
|
||||
}
|
||||
UpdateIfInitialWriteFails(db_stress_env, s, &initial_write_s,
|
||||
&initial_wal_write_may_succeed,
|
||||
&wait_for_recover_start_time);
|
||||
|
||||
} while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
|
||||
initial_wal_write_may_succeed);
|
||||
|
||||
|
@ -1719,6 +1709,9 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
thread->shared->SafeTerminate();
|
||||
}
|
||||
} else {
|
||||
PrintWriteRecoveryWaitTimeIfNeeded(
|
||||
db_stress_env, initial_write_s, initial_wal_write_may_succeed,
|
||||
wait_for_recover_start_time, "TestPut");
|
||||
pending_expected_value.Commit();
|
||||
thread->stats.AddBytesForWrites(1, sz);
|
||||
PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
|
||||
|
@ -1756,25 +1749,18 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
// Use delete if the key may be overwritten and a single deletion
|
||||
// otherwise.
|
||||
if (shared->AllowsOverwrite(rand_key)) {
|
||||
bool prepared = false;
|
||||
PendingExpectedValue pending_expected_value =
|
||||
shared->PrepareDelete(rand_column_family, rand_key, &prepared);
|
||||
if (!prepared) {
|
||||
pending_expected_value.PermitUnclosedPendingState();
|
||||
return s;
|
||||
}
|
||||
shared->PrepareDelete(rand_column_family, rand_key);
|
||||
|
||||
uint64_t wait_for_recover_start_time = 0;
|
||||
do {
|
||||
// In order to commit the expected state for the initial write failed
|
||||
// with injected retryable error and successful WAL write, retry the
|
||||
// write until it succeeds after the recovery finishes
|
||||
if (!s.ok() && IsErrorInjectedAndRetryable(s) &&
|
||||
initial_wal_write_may_succeed) {
|
||||
lock.reset();
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::microseconds(1 * 1000 * 1000));
|
||||
lock.reset(new MutexLock(
|
||||
shared->GetMutexForKey(rand_column_family, rand_key)));
|
||||
}
|
||||
if (!FLAGS_use_txn) {
|
||||
if (FLAGS_user_timestamp_size == 0) {
|
||||
|
@ -1787,13 +1773,9 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
return txn.Delete(cfh, key);
|
||||
});
|
||||
}
|
||||
// Only update `initial_write_s`, `initial_wal_write_may_succeed` when
|
||||
// the first write fails
|
||||
if (!s.ok() && initial_write_s.ok()) {
|
||||
initial_write_s = s;
|
||||
initial_wal_write_may_succeed =
|
||||
!FaultInjectionTestFS::IsFailedToWriteToWALError(initial_write_s);
|
||||
}
|
||||
UpdateIfInitialWriteFails(db_stress_env, s, &initial_write_s,
|
||||
&initial_wal_write_may_succeed,
|
||||
&wait_for_recover_start_time);
|
||||
} while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
|
||||
initial_wal_write_may_succeed);
|
||||
|
||||
|
@ -1816,29 +1798,25 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
thread->shared->SafeTerminate();
|
||||
}
|
||||
} else {
|
||||
PrintWriteRecoveryWaitTimeIfNeeded(
|
||||
db_stress_env, initial_write_s, initial_wal_write_may_succeed,
|
||||
wait_for_recover_start_time, "TestDelete");
|
||||
pending_expected_value.Commit();
|
||||
thread->stats.AddDeletes(1);
|
||||
}
|
||||
} else {
|
||||
bool prepared = false;
|
||||
PendingExpectedValue pending_expected_value =
|
||||
shared->PrepareSingleDelete(rand_column_family, rand_key, &prepared);
|
||||
if (!prepared) {
|
||||
pending_expected_value.PermitUnclosedPendingState();
|
||||
return s;
|
||||
}
|
||||
shared->PrepareSingleDelete(rand_column_family, rand_key);
|
||||
|
||||
uint64_t wait_for_recover_start_time = 0;
|
||||
do {
|
||||
// In order to commit the expected state for the initial write failed
|
||||
// with injected retryable error and successful WAL write, retry the
|
||||
// write until it succeeds after the recovery finishes
|
||||
if (!s.ok() && IsErrorInjectedAndRetryable(s) &&
|
||||
initial_wal_write_may_succeed) {
|
||||
lock.reset();
|
||||
std::this_thread::sleep_for(
|
||||
std::chrono::microseconds(1 * 1000 * 1000));
|
||||
lock.reset(new MutexLock(
|
||||
shared->GetMutexForKey(rand_column_family, rand_key)));
|
||||
}
|
||||
if (!FLAGS_use_txn) {
|
||||
if (FLAGS_user_timestamp_size == 0) {
|
||||
|
@ -1851,13 +1829,9 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
return txn.SingleDelete(cfh, key);
|
||||
});
|
||||
}
|
||||
// Only update `initial_write_s`, `initial_wal_write_may_succeed` when
|
||||
// the first write fails
|
||||
if (!s.ok() && initial_write_s.ok()) {
|
||||
initial_write_s = s;
|
||||
initial_wal_write_may_succeed =
|
||||
!FaultInjectionTestFS::IsFailedToWriteToWALError(initial_write_s);
|
||||
}
|
||||
UpdateIfInitialWriteFails(db_stress_env, s, &initial_write_s,
|
||||
&initial_wal_write_may_succeed,
|
||||
&wait_for_recover_start_time);
|
||||
} while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
|
||||
initial_wal_write_may_succeed);
|
||||
|
||||
|
@ -1880,6 +1854,9 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
thread->shared->SafeTerminate();
|
||||
}
|
||||
} else {
|
||||
PrintWriteRecoveryWaitTimeIfNeeded(
|
||||
db_stress_env, initial_write_s, initial_wal_write_may_succeed,
|
||||
wait_for_recover_start_time, "TestDelete");
|
||||
pending_expected_value.Commit();
|
||||
thread->stats.AddSingleDeletes(1);
|
||||
}
|
||||
|
@ -1914,18 +1891,9 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
// write
|
||||
bool initial_wal_write_may_succeed = true;
|
||||
|
||||
bool prepared = false;
|
||||
std::vector<PendingExpectedValue> pending_expected_values =
|
||||
shared->PrepareDeleteRange(rand_column_family, rand_key,
|
||||
rand_key + FLAGS_range_deletion_width,
|
||||
&prepared);
|
||||
if (!prepared) {
|
||||
for (PendingExpectedValue& pending_expected_value :
|
||||
pending_expected_values) {
|
||||
pending_expected_value.PermitUnclosedPendingState();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
rand_key + FLAGS_range_deletion_width);
|
||||
|
||||
const int covered = static_cast<int>(pending_expected_values.size());
|
||||
std::string keystr = Key(rand_key);
|
||||
|
@ -1935,6 +1903,7 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
Slice end_key = end_keystr;
|
||||
std::string write_ts_str;
|
||||
Slice write_ts;
|
||||
uint64_t wait_for_recover_start_time = 0;
|
||||
|
||||
do {
|
||||
// In order to commit the expected state for the initial write failed with
|
||||
|
@ -1942,10 +1911,7 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
// until it succeeds after the recovery finishes
|
||||
if (!s.ok() && IsErrorInjectedAndRetryable(s) &&
|
||||
initial_wal_write_may_succeed) {
|
||||
range_locks.clear();
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(1 * 1000 * 1000));
|
||||
GetDeleteRangeKeyLocks(thread, rand_column_family, rand_key,
|
||||
&range_locks);
|
||||
}
|
||||
if (FLAGS_user_timestamp_size) {
|
||||
write_ts_str = GetNowNanos();
|
||||
|
@ -1954,13 +1920,9 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
} else {
|
||||
s = db_->DeleteRange(write_opts, cfh, key, end_key);
|
||||
}
|
||||
// Only update `initial_write_s`, `initial_wal_write_may_succeed` when the
|
||||
// first write fails
|
||||
if (!s.ok() && initial_write_s.ok()) {
|
||||
initial_write_s = s;
|
||||
initial_wal_write_may_succeed =
|
||||
!FaultInjectionTestFS::IsFailedToWriteToWALError(initial_write_s);
|
||||
}
|
||||
UpdateIfInitialWriteFails(db_stress_env, s, &initial_write_s,
|
||||
&initial_wal_write_may_succeed,
|
||||
&wait_for_recover_start_time);
|
||||
} while (!s.ok() && IsErrorInjectedAndRetryable(s) &&
|
||||
initial_wal_write_may_succeed);
|
||||
|
||||
|
@ -1985,6 +1947,9 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
thread->shared->SafeTerminate();
|
||||
}
|
||||
} else {
|
||||
PrintWriteRecoveryWaitTimeIfNeeded(
|
||||
db_stress_env, initial_write_s, initial_wal_write_may_succeed,
|
||||
wait_for_recover_start_time, "TestDeleteRange");
|
||||
for (PendingExpectedValue& pending_expected_value :
|
||||
pending_expected_values) {
|
||||
pending_expected_value.Commit();
|
||||
|
@ -2057,16 +2022,8 @@ class NonBatchedOpsStressTest : public StressTest {
|
|||
}
|
||||
keys.push_back(key);
|
||||
|
||||
bool prepared = false;
|
||||
PendingExpectedValue pending_expected_value =
|
||||
shared->PreparePut(column_family, key, &prepared);
|
||||
if (!prepared) {
|
||||
pending_expected_value.PermitUnclosedPendingState();
|
||||
for (PendingExpectedValue& pev : pending_expected_values) {
|
||||
pev.PermitUnclosedPendingState();
|
||||
}
|
||||
return;
|
||||
}
|
||||
shared->PreparePut(column_family, key);
|
||||
|
||||
const uint32_t value_base = pending_expected_value.GetFinalValueBase();
|
||||
values.push_back(value_base);
|
||||
|
|
Loading…
Reference in New Issue