Add experimental range filters to stress/crash test (#12769)

Summary:
Implemented two key segment extractors that satisfy the "segment prefix property," one with variable segment widths and one with fixed. Used these to create a couple of named configs and versions that are randomly selected by the crash test. On the read side, the required table_filter is set up everywhere I found the stress test uses iterator_upper_bound.

Writing filters on new SST files and applying filters on SST files to range queries are configured independently, to potentially help with isolating different sides of the functionality.

Not yet implemented / possible follow-up:
* Consider manipulating/skewing the query bounds to better exercise filters
* Not yet using categories in the extractors
* Not yet dynamically changing the filtering version

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12769

Test Plan: Some stress test trial runs, including with ASAN. Inserted some temporary probes to ensure code was being exercised (more or less) as intended.

Reviewed By: hx235

Differential Revision: D58547462

Pulled By: pdillinger

fbshipit-source-id: f7b1596dd668426268c5293ac17615f749703f52
This commit is contained in:
Peter Dillinger 2024-06-18 16:16:09 -07:00 committed by Facebook GitHub Bot
parent f26e2fedb3
commit 71f9e6b5b3
14 changed files with 197 additions and 8 deletions

View File

@ -395,6 +395,7 @@ rocks_cpp_library_wrapper(name="rocksdb_stress_lib", srcs=[
"db_stress_tool/cf_consistency_stress.cc",
"db_stress_tool/db_stress_common.cc",
"db_stress_tool/db_stress_driver.cc",
"db_stress_tool/db_stress_filters.cc",
"db_stress_tool/db_stress_gflags.cc",
"db_stress_tool/db_stress_listener.cc",
"db_stress_tool/db_stress_shared_state.cc",

View File

@ -4,6 +4,7 @@ add_executable(db_stress${ARTIFACT_SUFFIX}
db_stress.cc
db_stress_common.cc
db_stress_driver.cc
db_stress_filters.cc
db_stress_gflags.cc
db_stress_listener.cc
db_stress_shared_state.cc

View File

@ -590,6 +590,11 @@ class BatchedOpsStressTest : public StressTest {
// For half of the time, set the upper bound to the next prefix
ub_slices[i] = upper_bounds[i];
ro_copies[i].iterate_upper_bound = &(ub_slices[i]);
if (FLAGS_use_sqfc_for_range_queries) {
ro_copies[i].table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(prefix_slices[i],
ub_slices[i]);
}
}
iters[i].reset(db_->NewIterator(ro_copies[i], cfh));

View File

@ -746,6 +746,10 @@ class CfConsistencyStressTest : public StressTest {
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
if (FLAGS_use_sqfc_for_range_queries) {
ro_copy.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(prefix, ub_slice);
}
}
ColumnFamilyHandle* const cfh =

View File

@ -170,6 +170,9 @@ DECLARE_int32(bloom_before_level);
DECLARE_bool(partition_filters);
DECLARE_bool(optimize_filters_for_memory);
DECLARE_bool(detect_filter_construct_corruption);
DECLARE_string(sqfc_name);
DECLARE_uint32(sqfc_version);
DECLARE_bool(use_sqfc_for_range_queries);
DECLARE_int32(index_type);
DECLARE_int32(data_block_index_type);
DECLARE_string(db);

View File

@ -0,0 +1,93 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db_stress_tool/db_stress_filters.h"
#include <memory>
#include <mutex>
namespace ROCKSDB_NAMESPACE {
#ifdef GFLAGS
using experimental::KeySegmentsExtractor;
using experimental::MakeSharedBytewiseMinMaxSQFC;
using experimental::SelectKeySegment;
using experimental::SstQueryFilterConfigs;
using experimental::SstQueryFilterConfigsManager;
namespace {
class VariableWidthExtractor : public KeySegmentsExtractor {
public:
const char* Name() const override { return "VariableWidthExtractor"; }
void Extract(const Slice& key_or_bound, KeyKind /*kind*/,
Result* result) const override {
uint32_t len = static_cast<uint32_t>(key_or_bound.size());
// This uses as delimiter any zero byte that follows a non-zero byte.
// And in accordance with best practice, the delimiter is part of the
// segment leading up to it.
bool prev_non_zero = false;
for (uint32_t i = 0; i < len; ++i) {
if ((prev_non_zero && key_or_bound[i] == 0) || i + 1 == len) {
result->segment_ends.push_back(i + 1);
}
prev_non_zero = key_or_bound[i] != 0;
}
}
};
const auto kVariableWidthExtractor = std::make_shared<VariableWidthExtractor>();
class FixedWidthExtractor : public KeySegmentsExtractor {
public:
const char* Name() const override { return "FixedWidthExtractor"; }
void Extract(const Slice& key_or_bound, KeyKind /*kind*/,
Result* result) const override {
uint32_t len = static_cast<uint32_t>(key_or_bound.size());
// Fixed 8-byte segments, with any leftovers going into another
// segment.
uint32_t i = 0;
while (i + 8 <= len) {
i += 8;
result->segment_ends.push_back(i);
}
if (i < len) {
result->segment_ends.push_back(len);
}
}
};
const auto kFixedWidthExtractor = std::make_shared<FixedWidthExtractor>();
// NOTE: MinMax filter on segment 0 is not normally useful because of metadata
// on smallest and largest key for each SST file. But it doesn't hurt to test.
const auto kFilter0 = MakeSharedBytewiseMinMaxSQFC(SelectKeySegment(0));
const auto kFilter1 = MakeSharedBytewiseMinMaxSQFC(SelectKeySegment(1));
const auto kFilter2 = MakeSharedBytewiseMinMaxSQFC(SelectKeySegment(2));
const auto kFilter3 = MakeSharedBytewiseMinMaxSQFC(SelectKeySegment(3));
const SstQueryFilterConfigs fooConfigs1{{kFilter0, kFilter2},
kVariableWidthExtractor};
const SstQueryFilterConfigs fooConfigs2{{kFilter1, kFilter3},
kVariableWidthExtractor};
const SstQueryFilterConfigs barConfigs2{{kFilter1, kFilter2},
kFixedWidthExtractor};
const SstQueryFilterConfigsManager::Data data = {
{1, {{"foo", fooConfigs1}}},
{2, {{"foo", fooConfigs2}, {"bar", barConfigs2}}},
};
} // namespace
SstQueryFilterConfigsManager& DbStressSqfcManager() {
std::once_flag flag;
static std::shared_ptr<SstQueryFilterConfigsManager> mgr;
std::call_once(flag, []() {
Status s = SstQueryFilterConfigsManager::MakeShared(data, &mgr);
assert(s.ok());
assert(mgr);
});
return *mgr;
}
#endif // GFLAGS
} // namespace ROCKSDB_NAMESPACE

View File

@ -0,0 +1,16 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifdef GFLAGS
#pragma once
#include "rocksdb/experimental.h"
namespace ROCKSDB_NAMESPACE {
experimental::SstQueryFilterConfigsManager& DbStressSqfcManager();
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS

View File

@ -582,6 +582,16 @@ DEFINE_bool(
.detect_filter_construct_corruption,
"Detect corruption during new Bloom Filter and Ribbon Filter construction");
DEFINE_string(sqfc_name, "foo",
"Config name to select from SstQueryFilterConfigsManager.");
DEFINE_uint32(sqfc_version, 0,
"User-defined filtering version to select from "
"SstQueryFilterConfigsManager. 0 = disable writing filters");
DEFINE_bool(use_sqfc_for_range_queries, true,
"Apply SstQueryFilters to range queries");
DEFINE_int32(
index_type,
static_cast<int32_t>(

View File

@ -18,6 +18,7 @@
#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_compaction_filter.h"
#include "db_stress_tool/db_stress_driver.h"
#include "db_stress_tool/db_stress_filters.h"
#include "db_stress_tool/db_stress_table_properties_collector.h"
#include "db_stress_tool/db_stress_wide_merge_operator.h"
#include "options/options_parser.h"
@ -90,6 +91,14 @@ StressTest::StressTest()
exit(1);
}
}
Status s = DbStressSqfcManager().MakeSharedFactory(
FLAGS_sqfc_name, FLAGS_sqfc_version, &sqfc_factory_);
if (!s.ok()) {
fprintf(stderr, "Error initializing SstQueryFilterConfig: %s\n",
s.ToString().c_str());
exit(1);
}
}
StressTest::~StressTest() {
@ -1474,8 +1483,8 @@ Status StressTest::TestIterateImpl(ThreadState* thread,
}
std::string upper_bound_str;
Slice upper_bound;
if (thread->rand.OneIn(16)) {
// With a 1/16 chance, set an iterator upper bound.
// Prefer no bound with no range query filtering; prefer bound with it
if (FLAGS_use_sqfc_for_range_queries ^ thread->rand.OneIn(16)) {
// Note: upper_bound can be smaller than the seek key.
const int64_t rand_upper_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
upper_bound_str = Key(rand_upper_key);
@ -1485,8 +1494,7 @@ Status StressTest::TestIterateImpl(ThreadState* thread,
std::string lower_bound_str;
Slice lower_bound;
if (thread->rand.OneIn(16)) {
// With a 1/16 chance, enable iterator lower bound.
if (FLAGS_use_sqfc_for_range_queries ^ thread->rand.OneIn(16)) {
// Note: lower_bound can be greater than the seek key.
const int64_t rand_lower_key = GenerateOneKey(thread, FLAGS_ops_per_thread);
lower_bound_str = Key(rand_lower_key);
@ -1494,6 +1502,12 @@ Status StressTest::TestIterateImpl(ThreadState* thread,
ro.iterate_lower_bound = &lower_bound;
}
if (FLAGS_use_sqfc_for_range_queries && ro.iterate_upper_bound &&
ro.iterate_lower_bound) {
ro.table_filter = sqfc_factory_->GetTableFilterForRangeQuery(
*ro.iterate_lower_bound, *ro.iterate_upper_bound);
}
std::unique_ptr<IterType> iter = new_iter_func(ro);
std::vector<std::string> key_strs;
@ -1518,8 +1532,10 @@ Status StressTest::TestIterateImpl(ThreadState* thread,
op_logs = "(cleared...)\n";
}
if (ro.iterate_upper_bound != nullptr && thread->rand.OneIn(2)) {
if (!FLAGS_use_sqfc_for_range_queries &&
ro.iterate_upper_bound != nullptr && thread->rand.OneIn(2)) {
// With a 1/2 chance, change the upper bound.
// Not compatible with sqfc range filter.
// It is possible that it is changed before first use, but there is no
// problem with that.
const int64_t rand_upper_key =
@ -1527,8 +1543,10 @@ Status StressTest::TestIterateImpl(ThreadState* thread,
upper_bound_str = Key(rand_upper_key);
upper_bound = Slice(upper_bound_str);
}
if (ro.iterate_lower_bound != nullptr && thread->rand.OneIn(4)) {
if (!FLAGS_use_sqfc_for_range_queries &&
ro.iterate_lower_bound != nullptr && thread->rand.OneIn(4)) {
// With a 1/4 chance, change the lower bound.
// Not compatible with sqfc range filter.
// It is possible that it is changed before first use, but there is no
// problem with that.
const int64_t rand_lower_key =
@ -3081,7 +3099,7 @@ void StressTest::Open(SharedState* shared, bool reopen) {
if (!InitializeOptionsFromFile(options_)) {
InitializeOptionsFromFlags(cache_, filter_policy_, options_);
}
InitializeOptionsGeneral(cache_, filter_policy_, options_);
InitializeOptionsGeneral(cache_, filter_policy_, sqfc_factory_, options_);
if (FLAGS_prefix_size == 0 && FLAGS_rep_factory == kHashSkipList) {
fprintf(stderr,
@ -3964,6 +3982,7 @@ void InitializeOptionsFromFlags(
void InitializeOptionsGeneral(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<const FilterPolicy>& filter_policy,
const std::shared_ptr<SstQueryFilterConfigsManager::Factory>& sqfc_factory,
Options& options) {
options.create_missing_column_families = true;
options.create_if_missing = true;
@ -4037,6 +4056,10 @@ void InitializeOptionsGeneral(
options.table_properties_collector_factories.emplace_back(
std::make_shared<DbStressTablePropertiesCollectorFactory>());
if (sqfc_factory && !sqfc_factory->GetConfigs().IsEmptyNotFound()) {
options.table_properties_collector_factories.emplace_back(sqfc_factory);
}
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -12,6 +12,7 @@
#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_shared_state.h"
#include "rocksdb/experimental.h"
namespace ROCKSDB_NAMESPACE {
class SystemClock;
@ -19,6 +20,7 @@ class Transaction;
class TransactionDB;
class OptimisticTransactionDB;
struct TransactionDBOptions;
using experimental::SstQueryFilterConfigsManager;
class StressTest {
public:
@ -319,6 +321,7 @@ class StressTest {
std::unordered_map<std::string, std::vector<std::string>> options_table_;
std::vector<std::string> options_index_;
std::atomic<bool> db_preload_finished_;
std::shared_ptr<SstQueryFilterConfigsManager::Factory> sqfc_factory_;
// Fields used for continuous verification from another thread
DB* cmp_db_;
@ -360,7 +363,9 @@ void InitializeOptionsFromFlags(
// from OPTIONS file.
void InitializeOptionsGeneral(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<const FilterPolicy>& filter_policy, Options& options);
const std::shared_ptr<const FilterPolicy>& filter_policy,
const std::shared_ptr<SstQueryFilterConfigsManager::Factory>& sqfc_factory,
Options& options);
// If no OPTIONS file is specified, set up `options` so that we can test
// user-defined timestamp which requires `-user_timestamp_size=8`.

View File

@ -756,6 +756,10 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
ropts.iterate_upper_bound = &iter_ub;
ropts.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
if (FLAGS_use_sqfc_for_range_queries) {
ropts.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(old_sk_prefix, iter_ub);
}
it = txn->GetIterator(ropts);
assert(it);
@ -1114,6 +1118,10 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
ropts.snapshot = snapshot;
ropts.total_order_seek = true;
ropts.iterate_upper_bound = &iter_ub;
if (FLAGS_use_sqfc_for_range_queries) {
ropts.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(start_key, iter_ub);
}
std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
for (it->Seek(start_key); it->Valid(); it->Next()) {
@ -1622,6 +1630,10 @@ void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) {
ropts.iterate_lower_bound = &pk_lb;
ropts.iterate_upper_bound = &pk_ub;
ropts.total_order_seek = true;
if (FLAGS_use_sqfc_for_range_queries) {
ropts.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(pk_lb, pk_ub);
}
std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
for (it->SeekToFirst(); it->Valid(); it->Next()) {

View File

@ -1375,6 +1375,10 @@ class NonBatchedOpsStressTest : public StressTest {
// For half of the time, set the upper bound to the next prefix
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
if (FLAGS_use_sqfc_for_range_queries) {
ro_copy.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(prefix, ub_slice);
}
}
std::string read_ts_str;
@ -1902,6 +1906,13 @@ class NonBatchedOpsStressTest : public StressTest {
// GetIntVal().
ro.iterate_upper_bound = &max_key_slice;
}
std::string ub_str, lb_str;
if (FLAGS_use_sqfc_for_range_queries) {
ub_str = Key(ub);
lb_str = Key(lb);
ro.table_filter =
sqfc_factory_->GetTableFilterForRangeQuery(lb_str, ub_str);
}
ColumnFamilyHandle* const cfh = column_families_[rand_column_family];
assert(cfh);

1
src.mk
View File

@ -381,6 +381,7 @@ STRESS_LIB_SOURCES = \
db_stress_tool/cf_consistency_stress.cc \
db_stress_tool/db_stress_common.cc \
db_stress_tool/db_stress_driver.cc \
db_stress_tool/db_stress_filters.cc \
db_stress_tool/db_stress_gflags.cc \
db_stress_tool/db_stress_listener.cc \
db_stress_tool/db_stress_shared_state.cc \

View File

@ -130,6 +130,9 @@ default_params = {
"readpercent": 45,
"recycle_log_file_num": lambda: random.randint(0, 1),
"snapshot_hold_ops": 100000,
"sqfc_name": lambda: random.choice(["foo", "bar"]),
# 0 = disable writing SstQueryFilters
"sqfc_version": lambda: random.choice([0, 1, 1, 2, 2]),
"sst_file_manager_bytes_per_sec": lambda: random.choice([0, 104857600]),
"sst_file_manager_bytes_per_truncate": lambda: random.choice([0, 1048576]),
"long_running_snapshots": lambda: random.randint(0, 1),
@ -141,6 +144,7 @@ default_params = {
"unpartitioned_pinning": lambda: random.randint(0, 3),
"use_direct_reads": lambda: random.randint(0, 1),
"use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
"use_sqfc_for_range_queries": lambda: random.choice([0, 1, 1, 1]),
"mock_direct_io": False,
"cache_type": lambda: random.choice(
["lru_cache", "fixed_hyper_clock_cache", "auto_hyper_clock_cache",