Add the integrated BlobDB to the stress/crash tests (#7900)

Summary:
The patch adds support for the options related to the new BlobDB implementation
to `db_stress`, including support for dynamically adjusting them using `SetOptions`
when `set_options_one_in` and a new flag `allow_setting_blob_options_dynamically`
are specified. (The latter is used to prevent the options from being enabled when
incompatible features are in use.)

The patch also updates the `db_stress` help messages of the existing stacked BlobDB
related options to clarify that they pertain to the old implementation. In addition, it
adds the new BlobDB to the crash test script. In order to prevent a combinatorial explosion
of jobs and still perform whitebox/blackbox testing (including under ASAN/TSAN/UBSAN),
and to also test BlobDB in conjunction with atomic flush and transactions, the script sets
the BlobDB options in 10% of normal/`cf_consistency`/`txn` crash test runs.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7900

Test Plan: Ran `make check` and `db_stress`/`db_crashtest.py` with various options.

Reviewed By: jay-zhuang

Differential Revision: D26094913

Pulled By: ltamasi

fbshipit-source-id: c2ef3391a05e43a9687f24e297df05f4a5584814
This commit is contained in:
Levi Tamasi 2021-02-02 11:39:20 -08:00 committed by Facebook GitHub Bot
parent 108e6b6354
commit 0288bdbc53
5 changed files with 154 additions and 14 deletions

View file

@ -232,6 +232,7 @@ DECLARE_int32(get_property_one_in);
DECLARE_string(file_checksum_impl);
#ifndef ROCKSDB_LITE
// Options for StackableDB-based BlobDB
DECLARE_bool(use_blob_db);
DECLARE_uint64(blob_db_min_blob_size);
DECLARE_uint64(blob_db_bytes_per_sync);
@ -239,6 +240,16 @@ DECLARE_uint64(blob_db_file_size);
DECLARE_bool(blob_db_enable_gc);
DECLARE_double(blob_db_gc_cutoff);
#endif // !ROCKSDB_LITE
// Options for integrated BlobDB
DECLARE_bool(allow_setting_blob_options_dynamically);
DECLARE_bool(enable_blob_files);
DECLARE_uint64(min_blob_size);
DECLARE_uint64(blob_file_size);
DECLARE_string(blob_compression_type);
DECLARE_bool(enable_blob_garbage_collection);
DECLARE_double(blob_garbage_collection_age_cutoff);
DECLARE_int32(approximate_size_one_in);
DECLARE_bool(sync_fault_injection);

View file

@ -325,33 +325,68 @@ DEFINE_bool(enable_write_thread_adaptive_yield, true,
"Use a yielding spin loop for brief writer thread waits.");
#ifndef ROCKSDB_LITE
// BlobDB Options
DEFINE_bool(use_blob_db, false, "Use BlobDB.");
// Options for StackableDB-based BlobDB
DEFINE_bool(use_blob_db, false, "[Stacked BlobDB] Use BlobDB.");
DEFINE_uint64(blob_db_min_blob_size,
DEFINE_uint64(
blob_db_min_blob_size,
ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().min_blob_size,
"Smallest blob to store in a file. Blobs smaller than this "
"will be inlined with the key in the LSM tree.");
"[Stacked BlobDB] Smallest blob to store in a file. Blobs "
"smaller than this will be inlined with the key in the LSM tree.");
DEFINE_uint64(blob_db_bytes_per_sync,
DEFINE_uint64(
blob_db_bytes_per_sync,
ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().bytes_per_sync,
"Sync blob files once per every N bytes written.");
"[Stacked BlobDB] Sync blob files once per every N bytes written.");
DEFINE_uint64(blob_db_file_size,
ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().blob_file_size,
"Target size of each blob file.");
"[Stacked BlobDB] Target size of each blob file.");
DEFINE_bool(
blob_db_enable_gc,
ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().enable_garbage_collection,
"Enable BlobDB garbage collection.");
"[Stacked BlobDB] Enable BlobDB garbage collection.");
DEFINE_double(
blob_db_gc_cutoff,
ROCKSDB_NAMESPACE::blob_db::BlobDBOptions().garbage_collection_cutoff,
"Cutoff ratio for BlobDB garbage collection.");
"[Stacked BlobDB] Cutoff ratio for BlobDB garbage collection.");
#endif // !ROCKSDB_LITE
// Options for integrated BlobDB
DEFINE_bool(allow_setting_blob_options_dynamically, false,
"[Integrated BlobDB] Allow setting blob options dynamically.");
DEFINE_bool(
enable_blob_files,
ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().enable_blob_files,
"[Integrated BlobDB] Enable writing large values to separate blob files.");
DEFINE_uint64(min_blob_size,
ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().min_blob_size,
"[Integrated BlobDB] The size of the smallest value to be stored "
"separately in a blob file.");
DEFINE_uint64(blob_file_size,
ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions().blob_file_size,
"[Integrated BlobDB] The size limit for blob files.");
DEFINE_string(blob_compression_type, "none",
"[Integrated BlobDB] The compression algorithm to use for large "
"values stored in blob files.");
DEFINE_bool(enable_blob_garbage_collection,
ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions()
.enable_blob_garbage_collection,
"[Integrated BlobDB] Enable blob garbage collection.");
DEFINE_double(blob_garbage_collection_age_cutoff,
ROCKSDB_NAMESPACE::AdvancedColumnFamilyOptions()
.blob_garbage_collection_age_cutoff,
"[Integrated BlobDB] The cutoff in terms of blob file age for "
"garbage collection.");
static const bool FLAGS_subcompactions_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);

View file

@ -108,6 +108,22 @@ std::shared_ptr<Cache> StressTest::NewCache(size_t capacity) {
}
}
std::vector<std::string> StressTest::GetBlobCompressionTags() {
std::vector<std::string> compression_tags{"kNoCompression"};
if (Snappy_Supported()) {
compression_tags.emplace_back("kSnappyCompression");
}
if (LZ4_Supported()) {
compression_tags.emplace_back("kLZ4Compression");
}
if (ZSTD_Supported()) {
compression_tags.emplace_back("kZSTD");
}
return compression_tags;
}
bool StressTest::BuildOptionsTable() {
if (FLAGS_set_options_one_in <= 0) {
return true;
@ -186,6 +202,21 @@ bool StressTest::BuildOptionsTable() {
{"max_sequential_skip_in_iterations", {"4", "8", "12"}},
};
if (FLAGS_allow_setting_blob_options_dynamically) {
options_tbl.emplace("enable_blob_files",
std::vector<std::string>{"false", "true"});
options_tbl.emplace("min_blob_size",
std::vector<std::string>{"0", "16", "256"});
options_tbl.emplace("blob_file_size",
std::vector<std::string>{"1M", "16M", "256M", "1G"});
options_tbl.emplace("blob_compression_type", GetBlobCompressionTags());
options_tbl.emplace("enable_blob_garbage_collection",
std::vector<std::string>{"false", "true"});
options_tbl.emplace(
"blob_garbage_collection_age_cutoff",
std::vector<std::string>{"0.0", "0.25", "0.5", "0.75", "1.0"});
}
options_table_ = std::move(options_tbl);
for (const auto& iter : options_table_) {
@ -1869,7 +1900,7 @@ void StressTest::PrintEnv() const {
fprintf(stdout, "TransactionDB : %s\n",
FLAGS_use_txn ? "true" : "false");
#ifndef ROCKSDB_LITE
fprintf(stdout, "BlobDB : %s\n",
fprintf(stdout, "Stacked BlobDB : %s\n",
FLAGS_use_blob_db ? "true" : "false");
#endif // !ROCKSDB_LITE
fprintf(stdout, "Read only mode : %s\n",
@ -2083,6 +2114,17 @@ void StressTest::Open() {
options_.file_checksum_gen_factory =
GetFileChecksumImpl(FLAGS_file_checksum_impl);
options_.track_and_verify_wals_in_manifest = true;
// Integrated BlobDB
options_.enable_blob_files = FLAGS_enable_blob_files;
options_.min_blob_size = FLAGS_min_blob_size;
options_.blob_file_size = FLAGS_blob_file_size;
options_.blob_compression_type =
StringToCompressionType(FLAGS_blob_compression_type.c_str());
options_.enable_blob_garbage_collection =
FLAGS_enable_blob_garbage_collection;
options_.blob_garbage_collection_age_cutoff =
FLAGS_blob_garbage_collection_age_cutoff;
} else {
#ifdef ROCKSDB_LITE
fprintf(stderr, "--options_file not supported in lite mode\n");
@ -2172,6 +2214,31 @@ void StressTest::Open() {
options_.best_efforts_recovery = FLAGS_best_efforts_recovery;
options_.paranoid_file_checks = FLAGS_paranoid_file_checks;
if ((options_.enable_blob_files || options_.enable_blob_garbage_collection ||
FLAGS_allow_setting_blob_options_dynamically) &&
(FLAGS_use_merge || FLAGS_enable_compaction_filter ||
FLAGS_checkpoint_one_in > 0 || FLAGS_backup_one_in > 0 ||
FLAGS_best_efforts_recovery)) {
fprintf(
stderr,
"Integrated BlobDB is currently incompatible with Merge, compaction "
"filters, checkpoints, backup/restore, and best-effort recovery\n");
exit(1);
}
if (options_.enable_blob_files) {
fprintf(stdout,
"Integrated BlobDB: blob files enabled, min blob size %" PRIu64
", blob file size %" PRIu64 ", blob compression type %s\n",
options_.min_blob_size, options_.blob_file_size,
CompressionTypeToString(options_.blob_compression_type).c_str());
}
if (options_.enable_blob_garbage_collection) {
fprintf(stdout, "Integrated BlobDB: blob GC enabled, cutoff %f\n",
options_.blob_garbage_collection_age_cutoff);
}
fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str());
Status s;
@ -2229,6 +2296,7 @@ void StressTest::Open() {
options_.create_missing_column_families = true;
if (!FLAGS_use_txn) {
#ifndef ROCKSDB_LITE
// StackableDB-based BlobDB
if (FLAGS_use_blob_db) {
blob_db::BlobDBOptions blob_db_options;
blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;

View file

@ -24,6 +24,8 @@ class StressTest {
std::shared_ptr<Cache> NewCache(size_t capacity);
static std::vector<std::string> GetBlobCompressionTags();
bool BuildOptionsTable();
void InitDb();

View file

@ -265,6 +265,23 @@ best_efforts_recovery_params = {
"continuous_verification_interval": 0,
}
blob_params = {
"allow_setting_blob_options_dynamically": 1,
# Enable blob files and GC with a 75% chance initially; note that they might still be
# enabled/disabled during the test via SetOptions
"enable_blob_files": lambda: random.choice([0] + [1] * 3),
"min_blob_size": lambda: random.choice([0, 16, 256]),
"blob_file_size": lambda: random.choice([1048576, 16777216, 268435456, 1073741824]),
"blob_compression_type": lambda: random.choice(["none", "snappy", "lz4", "zstd"]),
"enable_blob_garbage_collection": lambda: random.choice([0] + [1] * 3),
"blob_garbage_collection_age_cutoff": lambda: random.choice([0.0, 0.25, 0.5, 0.75, 1.0]),
# The following are currently incompatible with the integrated BlobDB
"use_merge": 0,
"enable_compaction_filter": 0,
"backup_one_in": 0,
"checkpoint_one_in": 0,
}
def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()])
@ -356,6 +373,12 @@ def gen_cmd_params(args):
if args.test_best_efforts_recovery:
params.update(best_efforts_recovery_params)
# Best-effort recovery and BlobDB are currently incompatible. Test BE recovery
# if specified on the command line; otherwise, apply BlobDB related overrides
# with a 10% chance.
if not args.test_best_efforts_recovery and random.choice([0] * 9 + [1]) == 1:
params.update(blob_params)
for k, v in vars(args).items():
if v is not None:
params[k] = v
@ -628,7 +651,8 @@ def main():
+ list(whitebox_default_params.items())
+ list(simple_default_params.items())
+ list(blackbox_simple_default_params.items())
+ list(whitebox_simple_default_params.items()))
+ list(whitebox_simple_default_params.items())
+ list(blob_params.items()))
for k, v in all_params.items():
parser.add_argument("--" + k, type=type(v() if callable(v) else v))