rocksdb/tools/db_crashtest.py
Hui Xiao aed30ddf21 Support WriteCommit policy with sync_fault_injection=1 (#10624)
Summary:
**Context:**
Prior to this PR, correctness testing with un-sync data loss [disabled](https://github.com/facebook/rocksdb/pull/10605) transaction (`use_txn=1`) thus all of the `txn_write_policy` . This PR improved that by adding support for one policy - WriteCommit (`txn_write_policy=0`).

**Summary:**
They key to this support is (a) handle Mark{Begin, End}Prepare/MarkCommit/MarkRollback in constructing ExpectedState under WriteCommit policy correctly and (b) monitor CI jobs and solve any test incompatibility issue till jobs are stable. (b) will be part of the test plan.

For (a)
- During prepare (i.e, between `MarkBeginPrepare()` and `MarkEndPrepare(xid)`), `ExpectedStateTraceRecordHandler` will buffer all writes by adding all writes to an internal `WriteBatch`.
- On `MarkEndPrepare()`, that `WriteBatch` will be associated with the transaction's `xid`.
- During the commit (i.e, on `MarkCommit(xid)`), `ExpectedStateTraceRecordHandler` will retrieve and iterate the internal `WriteBatch` and finally apply those writes to `ExpectedState`
- During the rollback (i.e, on `MarkRollback(xid)`), `ExpectedStateTraceRecordHandler` will erase the internal `WriteBatch` from the map.

For (b) - one major issue described below:
- TransactionsDB in db stress recovers prepared-but-not-committed txns from the previous crashed run by randomly committing or rolling back it at the start of the current run, see a historical [PR](6d06be22c0) predated correctness testing.
- And we will verify those processed keys in a recovered db against their expected state.
- However since now we turn on `sync_fault_injection=1` where the expected state is constructed from the trace instead of using the LATEST.state from previous run. The expected state now used to verify those processed keys won't contain UNKNOWN_SENTINEL as they should - see test 1 for a failed case.
- Therefore, we decided to manually update its expected state to be UNKNOWN_SENTINEL as part of the processing.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10624

Test Plan:
1. Test exposed the major issue described above. This test will fail without setting UNKNOWN_SENTINEL in expected state during the processing and pass after
```
db=/dev/shm/rocksdb_crashtest_blackbox
exp=/dev/shm/rocksdb_crashtest_expected
dbt=$db.tmp
expt=$exp.tmp

rm -rf $db $exp
mkdir -p $exp

echo "RUN 1"
./db_stress \
--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 &
pid=$!
sleep 0.2
sleep 20
kill $pid
sleep 0.2

echo "RUN 2"
./db_stress \
--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 &
pid=$!
sleep 0.2
sleep 20
kill $pid
sleep 0.2

echo "RUN 3"
./db_stress \
--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1
```

2. Manual testing to ensure ExpectedState is constructed correctly during recovery by verifying it against previously crashed TransactionDB's WAL.
   - Run the following command to crash a TransactionDB with WriteCommit policy. Then `./ldb dump_wal` on its WAL file
```
db=/dev/shm/rocksdb_crashtest_blackbox
exp=/dev/shm/rocksdb_crashtest_expected
rm -rf $db $exp
mkdir -p $exp

./db_stress \
	--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
	--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1 &
pid=$!
sleep 30
kill $pid
sleep 1
```
- Run the following command to verify recovery of the crashed db under debugger. Compare the step-wise result with WAL records (e.g, WriteBatch content, xid, prepare/commit/rollback marker)
```
   ./db_stress \
	--clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=0 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --test_batches_snapshots=0 --value_size_mult=32 --writepercent=90 \
	--use_txn=1 --txn_write_policy=0 --sync_fault_injection=1
```
3. Automatic testing by triggering all RocksDB stress/crash test jobs for 3 rounds with no failure.

Reviewed By: ajkr, riversand963

Differential Revision: D39199373

Pulled By: hx235

fbshipit-source-id: 7a1dec0e3e2ee6ea86ddf5dd19ceb5543a3d6f0c
2022-09-26 18:01:59 -07:00

971 lines
38 KiB
Python

#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
from __future__ import absolute_import, division, print_function, unicode_literals
import argparse
import os
import random
import shutil
import subprocess
import sys
import tempfile
import time
# params overwrite priority:
# for default:
# default_params < {blackbox,whitebox}_default_params < args
# for simple:
# default_params < {blackbox,whitebox}_default_params <
# simple_default_params <
# {blackbox,whitebox}_simple_default_params < args
# for cf_consistency:
# default_params < {blackbox,whitebox}_default_params <
# cf_consistency_params < args
# for txn:
# default_params < {blackbox,whitebox}_default_params < txn_params < args
# for ts:
# default_params < {blackbox,whitebox}_default_params < ts_params < args
# for multiops_txn:
# default_params < {blackbox,whitebox}_default_params < multiops_txn_params < args
default_params = {
"acquire_snapshot_one_in": 10000,
"backup_max_size": 100 * 1024 * 1024,
# Consider larger number when backups considered more stable
"backup_one_in": 100000,
"batch_protection_bytes_per_key": lambda: random.choice([0, 8]),
"memtable_protection_bytes_per_key": lambda: random.choice([0, 1, 2, 4, 8]),
"block_size": 16384,
"bloom_bits": lambda: random.choice(
[random.randint(0, 19), random.lognormvariate(2.3, 1.3)]
),
"cache_index_and_filter_blocks": lambda: random.randint(0, 1),
"cache_size": 8388608,
"charge_compression_dictionary_building_buffer": lambda: random.choice([0, 1]),
"charge_filter_construction": lambda: random.choice([0, 1]),
"charge_table_reader": lambda: random.choice([0, 1]),
"charge_file_metadata": lambda: random.choice([0, 1]),
"checkpoint_one_in": 1000000,
"compression_type": lambda: random.choice(
["none", "snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]
),
"bottommost_compression_type": lambda: "disable"
if random.randint(0, 1) == 0
else random.choice(["none", "snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]),
"checksum_type": lambda: random.choice(
["kCRC32c", "kxxHash", "kxxHash64", "kXXH3"]
),
"compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
"compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
# Disabled compression_parallel_threads as the feature is not stable
# lambda: random.choice([1] * 9 + [4])
"compression_parallel_threads": 1,
"compression_max_dict_buffer_bytes": lambda: (1 << random.randint(0, 40)) - 1,
"compression_use_zstd_dict_trainer": lambda: random.randint(0, 1),
"clear_column_family_one_in": 0,
"compact_files_one_in": 1000000,
"compact_range_one_in": 1000000,
"compaction_pri": random.randint(0, 4),
"data_block_index_type": lambda: random.choice([0, 1]),
"delpercent": 4,
"delrangepercent": 1,
"destroy_db_initially": 0,
"enable_pipelined_write": lambda: random.randint(0, 1),
"enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]),
"expected_values_dir": lambda: setup_expected_values_dir(),
"fail_if_options_file_error": lambda: random.randint(0, 1),
"flush_one_in": 1000000,
"file_checksum_impl": lambda: random.choice(["none", "crc32c", "xxh64", "big"]),
"get_live_files_one_in": 1000000,
# Note: the following two are intentionally disabled as the corresponding
# APIs are not guaranteed to succeed.
"get_sorted_wal_files_one_in": 0,
"get_current_wal_file_one_in": 0,
# Temporarily disable hash index
"index_type": lambda: random.choice([0, 0, 0, 2, 2, 3]),
"ingest_external_file_one_in": 1000000,
"iterpercent": 10,
"mark_for_compaction_one_file_in": lambda: 10 * random.randint(0, 1),
"max_background_compactions": 20,
"max_bytes_for_level_base": 10485760,
"max_key": 25000000,
"max_write_buffer_number": 3,
"mmap_read": lambda: random.randint(0, 1),
# Setting `nooverwritepercent > 0` is only possible because we do not vary
# the random seed, so the same keys are chosen by every run for disallowing
# overwrites.
"nooverwritepercent": 1,
"open_files": lambda: random.choice([-1, -1, 100, 500000]),
"optimize_filters_for_memory": lambda: random.randint(0, 1),
"partition_filters": lambda: random.randint(0, 1),
"partition_pinning": lambda: random.randint(0, 3),
"pause_background_one_in": 1000000,
"prefix_size": lambda: random.choice([-1, 1, 5, 7, 8]),
"prefixpercent": 5,
"progress_reports": 0,
"readpercent": 45,
"recycle_log_file_num": lambda: random.randint(0, 1),
"snapshot_hold_ops": 100000,
"sst_file_manager_bytes_per_sec": lambda: random.choice([0, 104857600]),
"sst_file_manager_bytes_per_truncate": lambda: random.choice([0, 1048576]),
"long_running_snapshots": lambda: random.randint(0, 1),
"subcompactions": lambda: random.randint(1, 4),
"target_file_size_base": 2097152,
"target_file_size_multiplier": 2,
"test_batches_snapshots": lambda: random.randint(0, 1),
"top_level_index_pinning": lambda: random.randint(0, 3),
"unpartitioned_pinning": lambda: random.randint(0, 3),
"use_direct_reads": lambda: random.randint(0, 1),
"use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
"mock_direct_io": False,
"cache_type": lambda: random.choice(["lru_cache", "hyper_clock_cache"]),
# fast_lru_cache is incompatible with stress tests, because it doesn't support strict_capacity_limit == false.
"use_full_merge_v1": lambda: random.randint(0, 1),
"use_merge": lambda: random.randint(0, 1),
# 999 -> use Bloom API
"ribbon_starting_level": lambda: random.choice([random.randint(-1, 10), 999]),
"value_size_mult": 32,
"verify_checksum": 1,
"write_buffer_size": 4 * 1024 * 1024,
"writepercent": 35,
"format_version": lambda: random.choice([2, 3, 4, 5, 5]),
"index_block_restart_interval": lambda: random.choice(range(1, 16)),
"use_multiget": lambda: random.randint(0, 1),
"periodic_compaction_seconds": lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
"compaction_ttl": lambda: random.choice([0, 0, 1, 2, 10, 100, 1000]),
# Test small max_manifest_file_size in a smaller chance, as most of the
# time we wnat manifest history to be preserved to help debug
"max_manifest_file_size": lambda: random.choice(
[t * 16384 if t < 3 else 1024 * 1024 * 1024 for t in range(1, 30)]
),
# Sync mode might make test runs slower so running it in a smaller chance
"sync": lambda: random.choice([1 if t == 0 else 0 for t in range(0, 20)]),
"bytes_per_sync": lambda: random.choice([0, 262144]),
"wal_bytes_per_sync": lambda: random.choice([0, 524288]),
# Disable compaction_readahead_size because the test is not passing.
# "compaction_readahead_size" : lambda : random.choice(
# [0, 0, 1024 * 1024]),
"db_write_buffer_size": lambda: random.choice(
[0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024]
),
"avoid_unnecessary_blocking_io": random.randint(0, 1),
"write_dbid_to_manifest": random.randint(0, 1),
"avoid_flush_during_recovery": lambda: random.choice(
[1 if t == 0 else 0 for t in range(0, 8)]
),
"max_write_batch_group_size_bytes": lambda: random.choice(
[16, 64, 1024 * 1024, 16 * 1024 * 1024]
),
"level_compaction_dynamic_level_bytes": True,
"verify_checksum_one_in": 1000000,
"verify_db_one_in": 100000,
"continuous_verification_interval": 0,
"max_key_len": 3,
"key_len_percent_dist": "1,30,69",
"read_fault_one_in": lambda: random.choice([0, 32, 1000]),
"open_metadata_write_fault_one_in": lambda: random.choice([0, 0, 8]),
"open_write_fault_one_in": lambda: random.choice([0, 0, 16]),
"open_read_fault_one_in": lambda: random.choice([0, 0, 32]),
"sync_fault_injection": lambda: random.randint(0, 1),
"get_property_one_in": 1000000,
"paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
"max_write_buffer_size_to_maintain": lambda: random.choice(
[0, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024]
),
"user_timestamp_size": 0,
"secondary_cache_fault_one_in": lambda: random.choice([0, 0, 32]),
"prepopulate_block_cache": lambda: random.choice([0, 1]),
"memtable_prefix_bloom_size_ratio": lambda: random.choice([0.001, 0.01, 0.1, 0.5]),
"memtable_whole_key_filtering": lambda: random.randint(0, 1),
"detect_filter_construct_corruption": lambda: random.choice([0, 1]),
"adaptive_readahead": lambda: random.choice([0, 1]),
"async_io": lambda: random.choice([0, 1]),
"wal_compression": lambda: random.choice(["none", "zstd"]),
"verify_sst_unique_id_in_manifest": 1, # always do unique_id verification
"secondary_cache_uri": lambda: random.choice(
[
"",
"compressed_secondary_cache://capacity=8388608",
"compressed_secondary_cache://capacity=8388608;enable_custom_split_merge=true",
]
),
"allow_data_in_errors": True,
"readahead_size": lambda: random.choice([0, 16384, 524288]),
"initial_auto_readahead_size": lambda: random.choice([0, 16384, 524288]),
"max_auto_readahead_size": lambda: random.choice([0, 16384, 524288]),
"num_file_reads_for_auto_readahead": lambda: random.choice([0, 1, 2]),
}
_TEST_DIR_ENV_VAR = "TEST_TMPDIR"
_DEBUG_LEVEL_ENV_VAR = "DEBUG_LEVEL"
stress_cmd = "./db_stress"
def is_release_mode():
return os.environ.get(_DEBUG_LEVEL_ENV_VAR) == "0"
def get_dbname(test_name):
test_dir_name = "rocksdb_crashtest_" + test_name
test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_tmpdir is None or test_tmpdir == "":
dbname = tempfile.mkdtemp(prefix=test_dir_name)
else:
dbname = test_tmpdir + "/" + test_dir_name
shutil.rmtree(dbname, True)
os.mkdir(dbname)
return dbname
expected_values_dir = None
def setup_expected_values_dir():
global expected_values_dir
if expected_values_dir is not None:
return expected_values_dir
expected_dir_prefix = "rocksdb_crashtest_expected_"
test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_tmpdir is None or test_tmpdir == "":
expected_values_dir = tempfile.mkdtemp(prefix=expected_dir_prefix)
else:
# if tmpdir is specified, store the expected_values_dir under that dir
expected_values_dir = test_tmpdir + "/rocksdb_crashtest_expected"
if os.path.exists(expected_values_dir):
shutil.rmtree(expected_values_dir)
os.mkdir(expected_values_dir)
return expected_values_dir
multiops_txn_key_spaces_file = None
def setup_multiops_txn_key_spaces_file():
global multiops_txn_key_spaces_file
if multiops_txn_key_spaces_file is not None:
return multiops_txn_key_spaces_file
key_spaces_file_prefix = "rocksdb_crashtest_multiops_txn_key_spaces"
test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_tmpdir is None or test_tmpdir == "":
multiops_txn_key_spaces_file = tempfile.mkstemp(prefix=key_spaces_file_prefix)[
1
]
else:
if not os.path.exists(test_tmpdir):
os.mkdir(test_tmpdir)
multiops_txn_key_spaces_file = tempfile.mkstemp(
prefix=key_spaces_file_prefix, dir=test_tmpdir
)[1]
return multiops_txn_key_spaces_file
def is_direct_io_supported(dbname):
with tempfile.NamedTemporaryFile(dir=dbname) as f:
try:
os.open(f.name, os.O_DIRECT)
except BaseException:
return False
return True
blackbox_default_params = {
"disable_wal": lambda: random.choice([0, 0, 0, 1]),
# total time for this script to test db_stress
"duration": 6000,
# time for one db_stress instance to run
"interval": 120,
# since we will be killing anyway, use large value for ops_per_thread
"ops_per_thread": 100000000,
"reopen": 0,
"set_options_one_in": 10000,
}
whitebox_default_params = {
# TODO: enable this once we figure out how to adjust kill odds for WAL-
# disabled runs, and either (1) separate full `db_stress` runs out of
# whitebox crash or (2) support verification at end of `db_stress` runs
# that ran with WAL disabled.
"disable_wal": 0,
"duration": 10000,
"log2_keys_per_lock": 10,
"ops_per_thread": 200000,
"random_kill_odd": 888887,
"reopen": 20,
}
simple_default_params = {
"allow_concurrent_memtable_write": lambda: random.randint(0, 1),
"column_families": 1,
# TODO: re-enable once internal task T124324915 is fixed.
# "experimental_mempurge_threshold": lambda: 10.0*random.random(),
"max_background_compactions": 1,
"max_bytes_for_level_base": 67108864,
"memtablerep": "skip_list",
"target_file_size_base": 16777216,
"target_file_size_multiplier": 1,
"test_batches_snapshots": 0,
"write_buffer_size": 32 * 1024 * 1024,
"level_compaction_dynamic_level_bytes": False,
"paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
"verify_iterator_with_expected_state_one_in": 5, # this locks a range of keys
}
blackbox_simple_default_params = {
"open_files": -1,
"set_options_one_in": 0,
}
whitebox_simple_default_params = {}
cf_consistency_params = {
"disable_wal": lambda: random.randint(0, 1),
"reopen": 0,
"test_cf_consistency": 1,
# use small value for write_buffer_size so that RocksDB triggers flush
# more frequently
"write_buffer_size": 1024 * 1024,
"enable_pipelined_write": lambda: random.randint(0, 1),
# Snapshots are used heavily in this test mode, while they are incompatible
# with compaction filter.
"enable_compaction_filter": 0,
# `CfConsistencyStressTest::TestIngestExternalFile()` is not implemented.
"ingest_external_file_one_in": 0,
}
txn_params = {
"use_txn": 1,
# Avoid lambda to set it once for the entire test
"txn_write_policy": random.randint(0, 2),
"unordered_write": random.randint(0, 1),
# TODO: there is such a thing as transactions with WAL disabled. We should
# cover that case.
"disable_wal": 0,
# OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns
"checkpoint_one_in": 0,
# pipeline write is not currnetly compatible with WritePrepared txns
"enable_pipelined_write": 0,
"create_timestamped_snapshot_one_in": random.choice([0, 20]),
}
best_efforts_recovery_params = {
"best_efforts_recovery": 1,
"atomic_flush": 0,
"disable_wal": 1,
"column_families": 1,
}
blob_params = {
"allow_setting_blob_options_dynamically": 1,
# Enable blob files and GC with a 75% chance initially; note that they might still be
# enabled/disabled during the test via SetOptions
"enable_blob_files": lambda: random.choice([0] + [1] * 3),
"min_blob_size": lambda: random.choice([0, 8, 16]),
"blob_file_size": lambda: random.choice([1048576, 16777216, 268435456, 1073741824]),
"blob_compression_type": lambda: random.choice(["none", "snappy", "lz4", "zstd"]),
"enable_blob_garbage_collection": lambda: random.choice([0] + [1] * 3),
"blob_garbage_collection_age_cutoff": lambda: random.choice(
[0.0, 0.25, 0.5, 0.75, 1.0]
),
"blob_garbage_collection_force_threshold": lambda: random.choice([0.5, 0.75, 1.0]),
"blob_compaction_readahead_size": lambda: random.choice([0, 1048576, 4194304]),
"blob_file_starting_level": lambda: random.choice(
[0] * 4 + [1] * 3 + [2] * 2 + [3]
),
"use_blob_cache": lambda: random.randint(0, 1),
"use_shared_block_and_blob_cache": lambda: random.randint(0, 1),
"blob_cache_size": lambda: random.choice([1048576, 2097152, 4194304, 8388608]),
"prepopulate_blob_cache": lambda: random.randint(0, 1),
}
ts_params = {
"test_cf_consistency": 0,
"test_batches_snapshots": 0,
"user_timestamp_size": 8,
"delrangepercent": 0,
"delpercent": 5,
"use_merge": 0,
"use_full_merge_v1": 0,
"use_txn": 0,
"enable_blob_files": 0,
"use_blob_db": 0,
"ingest_external_file_one_in": 0,
}
tiered_params = {
"enable_tiered_storage": 1,
# Set tiered compaction hot data time as: 1 minute, 1 hour, 10 hour
"preclude_last_level_data_seconds": lambda: random.choice([60, 3600, 36000]),
# only test universal compaction for now, level has known issue of
# endless compaction
"compaction_style": 1,
# tiered storage doesn't support blob db yet
"enable_blob_files": 0,
"use_blob_db": 0,
}
multiops_txn_default_params = {
"test_cf_consistency": 0,
"test_batches_snapshots": 0,
"test_multi_ops_txns": 1,
"use_txn": 1,
"two_write_queues": lambda: random.choice([0, 1]),
# TODO: enable write-prepared
"disable_wal": 0,
"use_only_the_last_commit_time_batch_for_recovery": lambda: random.choice([0, 1]),
"clear_column_family_one_in": 0,
"column_families": 1,
"enable_pipelined_write": lambda: random.choice([0, 1]),
# This test already acquires snapshots in reads
"acquire_snapshot_one_in": 0,
"backup_one_in": 0,
"writepercent": 0,
"delpercent": 0,
"delrangepercent": 0,
"customopspercent": 80,
"readpercent": 5,
"iterpercent": 15,
"prefixpercent": 0,
"verify_db_one_in": 1000,
"continuous_verification_interval": 1000,
"delay_snapshot_read_one_in": 3,
# 65536 is the smallest possible value for write_buffer_size. Smaller
# values will be sanitized to 65536 during db open. SetOptions currently
# does not sanitize options, but very small write_buffer_size may cause
# assertion failure in
# https://github.com/facebook/rocksdb/blob/7.0.fb/db/memtable.cc#L117.
"write_buffer_size": 65536,
# flush more frequently to generate more files, thus trigger more
# compactions.
"flush_one_in": 1000,
"key_spaces_path": setup_multiops_txn_key_spaces_file(),
"rollback_one_in": 4,
# Re-enable once we have a compaction for MultiOpsTxnStressTest
"enable_compaction_filter": 0,
"create_timestamped_snapshot_one_in": 50,
"sync_fault_injection": 0,
}
multiops_wc_txn_params = {
"txn_write_policy": 0,
# TODO re-enable pipelined write. Not well tested atm
"enable_pipelined_write": 0,
}
multiops_wp_txn_params = {
"txn_write_policy": 1,
"wp_snapshot_cache_bits": 1,
# try small wp_commit_cache_bits, e.g. 0 once we explore storing full
# commit sequence numbers in commit cache
"wp_commit_cache_bits": 10,
# pipeline write is not currnetly compatible with WritePrepared txns
"enable_pipelined_write": 0,
# OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns
"checkpoint_one_in": 0,
# Required to be 1 in order to use commit-time-batch
"use_only_the_last_commit_time_batch_for_recovery": 1,
"clear_wp_commit_cache_one_in": 10,
"create_timestamped_snapshot_one_in": 0,
}
def finalize_and_sanitize(src_params):
dest_params = {k: v() if callable(v) else v for (k, v) in src_params.items()}
if is_release_mode():
dest_params["read_fault_one_in"] = 0
if dest_params.get("compression_max_dict_bytes") == 0:
dest_params["compression_zstd_max_train_bytes"] = 0
dest_params["compression_max_dict_buffer_bytes"] = 0
if dest_params.get("compression_type") != "zstd":
dest_params["compression_zstd_max_train_bytes"] = 0
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
dest_params["memtablerep"] = "skip_list"
if dest_params["mmap_read"] == 1:
dest_params["use_direct_io_for_flush_and_compaction"] = 0
dest_params["use_direct_reads"] = 0
if dest_params["file_checksum_impl"] != "none":
# TODO(T109283569): there is a bug in `GenerateOneFileChecksum()`,
# used by `IngestExternalFile()`, causing it to fail with mmap
# reads. Remove this once it is fixed.
dest_params["ingest_external_file_one_in"] = 0
if (
dest_params["use_direct_io_for_flush_and_compaction"] == 1
or dest_params["use_direct_reads"] == 1
) and not is_direct_io_supported(dest_params["db"]):
if is_release_mode():
print(
"{} does not support direct IO. Disabling use_direct_reads and "
"use_direct_io_for_flush_and_compaction.\n".format(dest_params["db"])
)
dest_params["use_direct_reads"] = 0
dest_params["use_direct_io_for_flush_and_compaction"] = 0
else:
dest_params["mock_direct_io"] = True
# Multi-key operations are not currently compatible with transactions or
# timestamp.
if (
dest_params.get("test_batches_snapshots") == 1
or dest_params.get("use_txn") == 1
or dest_params.get("user_timestamp_size") > 0
):
dest_params["delpercent"] += dest_params["delrangepercent"]
dest_params["delrangepercent"] = 0
dest_params["ingest_external_file_one_in"] = 0
if (
dest_params.get("disable_wal") == 1
or dest_params.get("sync_fault_injection") == 1
):
# File ingestion does not guarantee prefix-recoverability when unsynced
# data can be lost. Ingesting a file syncs data immediately that is
# newer than unsynced memtable data that can be lost on restart.
#
# Even if the above issue is fixed or worked around, our
# trace-and-replay does not trace file ingestion, so in its current form
# it would not recover the expected state to the correct point in time.
dest_params["ingest_external_file_one_in"] = 0
# The `DbStressCompactionFilter` can apply memtable updates to SST
# files, which would be problematic when unsynced data can be lost in
# crash recoveries.
dest_params["enable_compaction_filter"] = 0
# Only under WritePrepared txns, unordered_write would provide the same guarnatees as vanilla rocksdb
if dest_params.get("unordered_write", 0) == 1:
dest_params["txn_write_policy"] = 1
dest_params["allow_concurrent_memtable_write"] = 1
if dest_params.get("disable_wal", 0) == 1:
dest_params["atomic_flush"] = 1
dest_params["sync"] = 0
dest_params["write_fault_one_in"] = 0
if dest_params.get("open_files", 1) != -1:
# Compaction TTL and periodic compactions are only compatible
# with open_files = -1
dest_params["compaction_ttl"] = 0
dest_params["periodic_compaction_seconds"] = 0
if dest_params.get("compaction_style", 0) == 2:
# Disable compaction TTL in FIFO compaction, because right
# now assertion failures are triggered.
dest_params["compaction_ttl"] = 0
dest_params["periodic_compaction_seconds"] = 0
if dest_params["partition_filters"] == 1:
if dest_params["index_type"] != 2:
dest_params["partition_filters"] = 0
if dest_params.get("atomic_flush", 0) == 1:
# disable pipelined write when atomic flush is used.
dest_params["enable_pipelined_write"] = 0
if dest_params.get("sst_file_manager_bytes_per_sec", 0) == 0:
dest_params["sst_file_manager_bytes_per_truncate"] = 0
if dest_params.get("enable_compaction_filter", 0) == 1:
# Compaction filter is incompatible with snapshots. Need to avoid taking
# snapshots, as well as avoid operations that use snapshots for
# verification.
dest_params["acquire_snapshot_one_in"] = 0
dest_params["compact_range_one_in"] = 0
# Give the iterator ops away to reads.
dest_params["readpercent"] += dest_params.get("iterpercent", 10)
dest_params["iterpercent"] = 0
dest_params["test_batches_snapshots"] = 0
if dest_params.get("prefix_size") == -1:
dest_params["readpercent"] += dest_params.get("prefixpercent", 20)
dest_params["prefixpercent"] = 0
dest_params["test_batches_snapshots"] = 0
if (
dest_params.get("prefix_size") == -1
and dest_params.get("memtable_whole_key_filtering") == 0
):
dest_params["memtable_prefix_bloom_size_ratio"] = 0
if dest_params.get("two_write_queues") == 1:
dest_params["enable_pipelined_write"] = 0
if dest_params.get("best_efforts_recovery") == 1:
dest_params["disable_wal"] = 1
dest_params["atomic_flush"] = 0
dest_params["enable_compaction_filter"] = 0
dest_params["sync"] = 0
dest_params["write_fault_one_in"] = 0
if dest_params["secondary_cache_uri"] != "":
# Currently the only cache type compatible with a secondary cache is LRUCache
dest_params["cache_type"] = "lru_cache"
# Remove the following once write-prepared/write-unprepared with/without
# unordered write supports timestamped snapshots
if dest_params.get("create_timestamped_snapshot_one_in", 0) > 0:
dest_params["txn_write_policy"] = 0
dest_params["unordered_write"] = 0
# For TransactionDB, correctness testing with unsync data loss is currently
# compatible with only write committed policy
if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0):
dest_params["sync_fault_injection"] = 0
return dest_params
def gen_cmd_params(args):
params = {}
params.update(default_params)
if args.test_type == "blackbox":
params.update(blackbox_default_params)
if args.test_type == "whitebox":
params.update(whitebox_default_params)
if args.simple:
params.update(simple_default_params)
if args.test_type == "blackbox":
params.update(blackbox_simple_default_params)
if args.test_type == "whitebox":
params.update(whitebox_simple_default_params)
if args.cf_consistency:
params.update(cf_consistency_params)
if args.txn:
params.update(txn_params)
if args.test_best_efforts_recovery:
params.update(best_efforts_recovery_params)
if args.enable_ts:
params.update(ts_params)
if args.test_multiops_txn:
params.update(multiops_txn_default_params)
if args.write_policy == "write_committed":
params.update(multiops_wc_txn_params)
elif args.write_policy == "write_prepared":
params.update(multiops_wp_txn_params)
if args.test_tiered_storage:
params.update(tiered_params)
# Best-effort recovery, user defined timestamp, tiered storage are currently
# incompatible with BlobDB. Test BE recovery if specified on the command
# line; otherwise, apply BlobDB related overrides with a 10% chance.
if (
not args.test_best_efforts_recovery
and not args.enable_ts
and not args.test_tiered_storage
and random.choice([0] * 9 + [1]) == 1
):
params.update(blob_params)
for k, v in vars(args).items():
if v is not None:
params[k] = v
return params
def gen_cmd(params, unknown_params):
finalzied_params = finalize_and_sanitize(params)
cmd = (
[stress_cmd]
+ [
"--{0}={1}".format(k, v)
for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
if k
not in {
"test_type",
"simple",
"duration",
"interval",
"random_kill_odd",
"cf_consistency",
"txn",
"test_best_efforts_recovery",
"enable_ts",
"test_multiops_txn",
"write_policy",
"stress_cmd",
"test_tiered_storage",
}
and v is not None
]
+ unknown_params
)
return cmd
def execute_cmd(cmd, timeout):
child = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
print("Running db_stress with pid=%d: %s\n\n" % (child.pid, " ".join(cmd)))
try:
outs, errs = child.communicate(timeout=timeout)
hit_timeout = False
print("WARNING: db_stress ended before kill: exitcode=%d\n" % child.returncode)
except subprocess.TimeoutExpired:
hit_timeout = True
child.kill()
print("KILLED %d\n" % child.pid)
outs, errs = child.communicate()
return hit_timeout, child.returncode, outs.decode("utf-8"), errs.decode("utf-8")
# This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in RocksDB.
def blackbox_crash_main(args, unknown_args):
cmd_params = gen_cmd_params(args)
dbname = get_dbname("blackbox")
exit_time = time.time() + cmd_params["duration"]
print(
"Running blackbox-crash-test with \n"
+ "interval_between_crash="
+ str(cmd_params["interval"])
+ "\n"
+ "total-duration="
+ str(cmd_params["duration"])
+ "\n"
)
while time.time() < exit_time:
cmd = gen_cmd(
dict(list(cmd_params.items()) + list({"db": dbname}.items())), unknown_args
)
hit_timeout, retcode, outs, errs = execute_cmd(cmd, cmd_params["interval"])
if not hit_timeout:
print("Exit Before Killing")
print("stdout:")
print(outs)
print("stderr:")
print(errs)
sys.exit(2)
for line in errs.split("\n"):
if line != "" and not line.startswith("WARNING"):
print("stderr has error message:")
print("***" + line + "***")
time.sleep(1) # time to stabilize before the next run
time.sleep(1) # time to stabilize before the next run
# we need to clean up after ourselves -- only do this on test success
shutil.rmtree(dbname, True)
# This python script runs db_stress multiple times. Some runs with
# kill_random_test that causes rocksdb to crash at various points in code.
def whitebox_crash_main(args, unknown_args):
cmd_params = gen_cmd_params(args)
dbname = get_dbname("whitebox")
cur_time = time.time()
exit_time = cur_time + cmd_params["duration"]
half_time = cur_time + cmd_params["duration"] // 2
print(
"Running whitebox-crash-test with \n"
+ "total-duration="
+ str(cmd_params["duration"])
+ "\n"
)
total_check_mode = 4
check_mode = 0
kill_random_test = cmd_params["random_kill_odd"]
kill_mode = 0
while time.time() < exit_time:
if check_mode == 0:
additional_opts = {
# use large ops per thread since we will kill it anyway
"ops_per_thread": 100
* cmd_params["ops_per_thread"],
}
# run with kill_random_test, with three modes.
# Mode 0 covers all kill points. Mode 1 covers less kill points but
# increases change of triggering them. Mode 2 covers even less
# frequent kill points and further increases triggering change.
if kill_mode == 0:
additional_opts.update(
{
"kill_random_test": kill_random_test,
}
)
elif kill_mode == 1:
if cmd_params.get("disable_wal", 0) == 1:
my_kill_odd = kill_random_test // 50 + 1
else:
my_kill_odd = kill_random_test // 10 + 1
additional_opts.update(
{
"kill_random_test": my_kill_odd,
"kill_exclude_prefixes": "WritableFileWriter::Append,"
+ "WritableFileWriter::WriteBuffered",
}
)
elif kill_mode == 2:
# TODO: May need to adjust random odds if kill_random_test
# is too small.
additional_opts.update(
{
"kill_random_test": (kill_random_test // 5000 + 1),
"kill_exclude_prefixes": "WritableFileWriter::Append,"
"WritableFileWriter::WriteBuffered,"
"PosixMmapFile::Allocate,WritableFileWriter::Flush",
}
)
# Run kill mode 0, 1 and 2 by turn.
kill_mode = (kill_mode + 1) % 3
elif check_mode == 1:
# normal run with universal compaction mode
additional_opts = {
"kill_random_test": None,
"ops_per_thread": cmd_params["ops_per_thread"],
"compaction_style": 1,
}
# Single level universal has a lot of special logic. Ensure we cover
# it sometimes.
if random.randint(0, 1) == 1:
additional_opts.update(
{
"num_levels": 1,
}
)
elif check_mode == 2:
# normal run with FIFO compaction mode
# ops_per_thread is divided by 5 because FIFO compaction
# style is quite a bit slower on reads with lot of files
additional_opts = {
"kill_random_test": None,
"ops_per_thread": cmd_params["ops_per_thread"] // 5,
"compaction_style": 2,
}
else:
# normal run
additional_opts = {
"kill_random_test": None,
"ops_per_thread": cmd_params["ops_per_thread"],
}
cmd = gen_cmd(
dict(
list(cmd_params.items())
+ list(additional_opts.items())
+ list({"db": dbname}.items())
),
unknown_args,
)
print(
"Running:" + " ".join(cmd) + "\n"
) # noqa: E999 T25377293 Grandfathered in
# If the running time is 15 minutes over the run time, explicit kill and
# exit even if white box kill didn't hit. This is to guarantee run time
# limit, as if it runs as a job, running too long will create problems
# for job scheduling or execution.
# TODO detect a hanging condition. The job might run too long as RocksDB
# hits a hanging bug.
hit_timeout, retncode, stdoutdata, stderrdata = execute_cmd(
cmd, exit_time - time.time() + 900
)
msg = "check_mode={0}, kill option={1}, exitcode={2}\n".format(
check_mode, additional_opts["kill_random_test"], retncode
)
print(msg)
print(stdoutdata)
print(stderrdata)
if hit_timeout:
print("Killing the run for running too long")
break
expected = False
if additional_opts["kill_random_test"] is None and (retncode == 0):
# we expect zero retncode if no kill option
expected = True
elif additional_opts["kill_random_test"] is not None and retncode <= 0:
# When kill option is given, the test MIGHT kill itself.
# If it does, negative retncode is expected. Otherwise 0.
expected = True
if not expected:
print("TEST FAILED. See kill option and exit code above!!!\n")
sys.exit(1)
stderrdata = stderrdata.lower()
errorcount = stderrdata.count("error") - stderrdata.count("got errors 0 times")
print("#times error occurred in output is " + str(errorcount) + "\n")
if errorcount > 0:
print("TEST FAILED. Output has 'error'!!!\n")
sys.exit(2)
if stderrdata.find("fail") >= 0:
print("TEST FAILED. Output has 'fail'!!!\n")
sys.exit(2)
# First half of the duration, keep doing kill test. For the next half,
# try different modes.
if time.time() > half_time:
# we need to clean up after ourselves -- only do this on test
# success
shutil.rmtree(dbname, True)
os.mkdir(dbname)
cmd_params.pop("expected_values_dir", None)
check_mode = (check_mode + 1) % total_check_mode
time.sleep(1) # time to stabilize after a kill
def main():
global stress_cmd
parser = argparse.ArgumentParser(
description="This script runs and kills \
db_stress multiple times"
)
parser.add_argument("test_type", choices=["blackbox", "whitebox"])
parser.add_argument("--simple", action="store_true")
parser.add_argument("--cf_consistency", action="store_true")
parser.add_argument("--txn", action="store_true")
parser.add_argument("--test_best_efforts_recovery", action="store_true")
parser.add_argument("--enable_ts", action="store_true")
parser.add_argument("--test_multiops_txn", action="store_true")
parser.add_argument("--write_policy", choices=["write_committed", "write_prepared"])
parser.add_argument("--stress_cmd")
parser.add_argument("--test_tiered_storage", action="store_true")
all_params = dict(
list(default_params.items())
+ list(blackbox_default_params.items())
+ list(whitebox_default_params.items())
+ list(simple_default_params.items())
+ list(blackbox_simple_default_params.items())
+ list(whitebox_simple_default_params.items())
+ list(blob_params.items())
+ list(ts_params.items())
+ list(multiops_txn_default_params.items())
+ list(multiops_wc_txn_params.items())
+ list(multiops_wp_txn_params.items())
+ list(best_efforts_recovery_params.items())
+ list(cf_consistency_params.items())
+ list(tiered_params.items())
+ list(txn_params.items())
)
for k, v in all_params.items():
parser.add_argument("--" + k, type=type(v() if callable(v) else v))
# unknown_args are passed directly to db_stress
args, unknown_args = parser.parse_known_args()
test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_tmpdir is not None and not os.path.isdir(test_tmpdir):
print(
"%s env var is set to a non-existent directory: %s"
% (_TEST_DIR_ENV_VAR, test_tmpdir)
)
sys.exit(1)
if args.stress_cmd:
stress_cmd = args.stress_cmd
if args.test_type == "blackbox":
blackbox_crash_main(args, unknown_args)
if args.test_type == "whitebox":
whitebox_crash_main(args, unknown_args)
# Only delete the `expected_values_dir` if test passes
if expected_values_dir is not None:
shutil.rmtree(expected_values_dir)
if multiops_txn_key_spaces_file is not None:
os.remove(multiops_txn_key_spaces_file)
if __name__ == "__main__":
main()