rocksdb/db_stress_tool/db_stress_test_base.h
Yanqin Jin b443d24f4d Stop operating on DB in a stress test background thread (#10373)
Summary:
Stress test background threads do not coordinate with test worker
threads for db reopen in the middle of a test run, thus accessing db
obj in a stress test bg thread can race with test workers. Remove the
TimestampedSnapshotThread.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10373

Test Plan:
```
./db_stress --acquire_snapshot_one_in=0 --adaptive_readahead=0 --allow_concurrent_memtable_write=1 \
--allow_data_in_errors=True --async_io=0 --avoid_flush_during_recovery=0 --avoid_unnecessary_blocking_io=1 \
--backup_max_size=104857600 --backup_one_in=100000 --batch_protection_bytes_per_key=8 \
--block_size=16384 --bloom_bits=7.580319535285394 --bottommost_compression_type=disable \
--bytes_per_sync=262144 --cache_index_and_filter_blocks=0 --cache_size=8388608 --cache_type=lru_cache \
--charge_compression_dictionary_building_buffer=1 --charge_file_metadata=0 --charge_filter_construction=1 \
--charge_table_reader=0 --checkpoint_one_in=0 --checksum_type=kxxHash64 --clear_column_family_one_in=0 \
--compact_files_one_in=1000000 --compact_range_one_in=0 --compaction_pri=1 --compaction_ttl=0 \
--compression_max_dict_buffer_bytes=0 --compression_max_dict_bytes=0 --compression_parallel_threads=1 \
--compression_type=xpress --compression_use_zstd_dict_trainer=1 --compression_zstd_max_train_bytes=0 \
--continuous_verification_interval=0 --create_timestamped_snapshot_one_in=20 --data_block_index_type=0 \
--db=/dev/shm/rocksdb/ --db_write_buffer_size=0 --delpercent=5 --delrangepercent=0 --destroy_db_initially=1 \
--detect_filter_construct_corruption=0 --disable_wal=0 --enable_compaction_filter=1 --enable_pipelined_write=0 \
--fail_if_options_file_error=1 --file_checksum_impl=xxh64 --flush_one_in=1000000 --format_version=2 \
--get_current_wal_file_one_in=0 --get_live_files_one_in=1000000 --get_property_one_in=1000000 \
--get_sorted_wal_files_one_in=0 --index_block_restart_interval=11 --index_type=0 --ingest_external_file_one_in=0 \
--iterpercent=0 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True \
--log2_keys_per_lock=10 --long_running_snapshots=0 --mark_for_compaction_one_file_in=10 \
--max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=25000000 \
--max_key_len=3 --max_manifest_file_size=1073741824 --max_write_batch_group_size_bytes=64 \
--max_write_buffer_number=3 --max_write_buffer_size_to_maintain=0 --memtable_prefix_bloom_size_ratio=0.5 \
--memtable_whole_key_filtering=1 --memtablerep=skip_list --mmap_read=0 --mock_direct_io=True \
--nooverwritepercent=1 --open_files=500000 --open_metadata_write_fault_one_in=0 \
--open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=20000 \
--optimize_filters_for_memory=1 --paranoid_file_checks=1 --partition_filters=0 --partition_pinning=2 \
--pause_background_one_in=1000000 --periodic_compaction_seconds=0 --prefix_size=1 \
--prefixpercent=5 --prepopulate_block_cache=0 --progress_reports=0 --read_fault_one_in=1000 \
--readpercent=55 --recycle_log_file_num=0 --reopen=100 --ribbon_starting_level=8 \
--secondary_cache_fault_one_in=0 --secondary_cache_uri= --snapshot_hold_ops=100000 \
--sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=0 \
--subcompactions=3 --sync=0 --sync_fault_injection=0 --target_file_size_base=2097152 \
--target_file_size_multiplier=2 --test_batches_snapshots=0 --top_level_index_pinning=1 \
--txn_write_policy=0 --unordered_write=0 --unpartitioned_pinning=0 \
--use_direct_io_for_flush_and_compaction=0 --use_direct_reads=1 --use_full_merge_v1=1 \
--use_merge=1 --use_multiget=0 --use_txn=1 --user_timestamp_size=0 --value_size_mult=32 \
--verify_checksum=1 --verify_checksum_one_in=1000000 --verify_db_one_in=100000 \
--verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=0 --wal_compression=none \
--write_buffer_size=4194304 --write_dbid_to_manifest=0 --writepercent=35
```
make crash_test_with_txn
make crash_test_with_multiops_wc_txn

Reviewed By: jay-zhuang

Differential Revision: D37903189

Pulled By: riversand963

fbshipit-source-id: cd1728ad7ba4ce4cf47af23c4f65dda0956744f9
2022-07-19 11:25:43 -07:00

312 lines
12 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifdef GFLAGS
#pragma once
#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_shared_state.h"
namespace ROCKSDB_NAMESPACE {
class SystemClock;
class Transaction;
class TransactionDB;
struct TransactionDBOptions;
class StressTest {
public:
StressTest();
virtual ~StressTest();
std::shared_ptr<Cache> NewCache(size_t capacity, int32_t num_shard_bits);
static std::vector<std::string> GetBlobCompressionTags();
bool BuildOptionsTable();
void InitDb(SharedState*);
// The initialization work is split into two parts to avoid a circular
// dependency with `SharedState`.
virtual void FinishInitDb(SharedState*);
void TrackExpectedState(SharedState* shared);
void OperateDb(ThreadState* thread);
virtual void VerifyDb(ThreadState* thread) const = 0;
virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const = 0;
void PrintStatistics();
protected:
Status AssertSame(DB* db, ColumnFamilyHandle* cf,
ThreadState::SnapshotState& snap_state);
// Currently PreloadDb has to be single-threaded.
void PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
SharedState* shared);
Status SetOptions(ThreadState* thread);
#ifndef ROCKSDB_LITE
Status NewTxn(WriteOptions& write_opts, Transaction** txn);
Status CommitTxn(Transaction* txn, ThreadState* thread = nullptr);
Status RollbackTxn(Transaction* txn);
#endif
virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {}
virtual bool ShouldAcquireMutexOnKey() const { return false; }
// Returns true if DB state is tracked by the stress test.
virtual bool IsStateTracked() const = 0;
virtual std::vector<int> GenerateColumnFamilies(
const int /* num_column_families */, int rand_column_family) const {
return {rand_column_family};
}
virtual std::vector<int64_t> GenerateKeys(int64_t rand_key) const {
return {rand_key};
}
virtual Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual std::vector<Status> TestMultiGet(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) = 0;
virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& read_opts,
const std::vector<int>& cf_ids,
const std::vector<int64_t>& keys, char (&value)[100],
std::unique_ptr<MutexLock>& lock) = 0;
virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& lock) = 0;
virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& lock) = 0;
virtual void TestIngestExternalFile(
ThreadState* thread, const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& lock) = 0;
// Issue compact range, starting with start_key, whose integer value
// is rand_key.
virtual void TestCompactRange(ThreadState* thread, int64_t rand_key,
const Slice& start_key,
ColumnFamilyHandle* column_family);
// Calculate a hash value for all keys in range [start_key, end_key]
// at a certain snapshot.
uint32_t GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
ColumnFamilyHandle* column_family,
const Slice& start_key, const Slice& end_key);
// Return a column family handle that mirrors what is pointed by
// `column_family_id`, which will be used to validate data to be correct.
// By default, the column family itself will be returned.
virtual ColumnFamilyHandle* GetControlCfh(ThreadState* /* thread*/,
int column_family_id) {
return column_families_[column_family_id];
}
#ifndef ROCKSDB_LITE
// Generated a list of keys that close to boundaries of SST keys.
// If there isn't any SST file in the DB, return empty list.
std::vector<std::string> GetWhiteBoxKeys(ThreadState* thread, DB* db,
ColumnFamilyHandle* cfh,
size_t num_keys);
#else // !ROCKSDB_LITE
std::vector<std::string> GetWhiteBoxKeys(ThreadState*, DB*,
ColumnFamilyHandle*, size_t) {
// Not supported in LITE mode.
return {};
}
#endif // !ROCKSDB_LITE
// Given a key K, this creates an iterator which scans to K and then
// does a random sequence of Next/Prev operations.
virtual Status TestIterate(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys);
// Enum used by VerifyIterator() to identify the mode to validate.
enum LastIterateOp {
kLastOpSeek,
kLastOpSeekForPrev,
kLastOpNextOrPrev,
kLastOpSeekToFirst,
kLastOpSeekToLast
};
// Compare the two iterator, iter and cmp_iter are in the same position,
// unless iter might be made invalidate or undefined because of
// upper or lower bounds, or prefix extractor.
// Will flag failure if the verification fails.
// diverged = true if the two iterator is already diverged.
// True if verification passed, false if not.
// op_logs is the information to print when validation fails.
void VerifyIterator(ThreadState* thread, ColumnFamilyHandle* cmp_cfh,
const ReadOptions& ro, Iterator* iter, Iterator* cmp_iter,
LastIterateOp op, const Slice& seek_key,
const std::string& op_logs, bool* diverged);
virtual Status TestBackupRestore(ThreadState* thread,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys);
virtual Status TestCheckpoint(ThreadState* thread,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys);
void TestCompactFiles(ThreadState* thread, ColumnFamilyHandle* column_family);
Status TestFlush(const std::vector<int>& rand_column_families);
Status TestPauseBackground(ThreadState* thread);
void TestAcquireSnapshot(ThreadState* thread, int rand_column_family,
const std::string& keystr, uint64_t i);
Status MaybeReleaseSnapshots(ThreadState* thread, uint64_t i);
#ifndef ROCKSDB_LITE
Status VerifyGetLiveFiles() const;
Status VerifyGetSortedWalFiles() const;
Status VerifyGetCurrentWalFile() const;
void TestGetProperty(ThreadState* thread) const;
virtual Status TestApproximateSize(
ThreadState* thread, uint64_t iteration,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys);
#endif // !ROCKSDB_LITE
virtual Status TestCustomOperations(
ThreadState* /*thread*/,
const std::vector<int>& /*rand_column_families*/) {
return Status::NotSupported("TestCustomOperations() must be overridden");
}
void VerificationAbort(SharedState* shared, std::string msg, Status s) const;
void VerificationAbort(SharedState* shared, std::string msg, int cf,
int64_t key) const;
void PrintEnv() const;
void Open(SharedState* shared);
void Reopen(ThreadState* thread);
virtual void RegisterAdditionalListeners() {}
#ifndef ROCKSDB_LITE
virtual void PrepareTxnDbOptions(SharedState* /*shared*/,
TransactionDBOptions& /*txn_db_opts*/) {}
#endif
void MaybeUseOlderTimestampForPointLookup(ThreadState* thread,
std::string& ts_str,
Slice& ts_slice,
ReadOptions& read_opts);
void MaybeUseOlderTimestampForRangeScan(ThreadState* thread,
std::string& ts_str, Slice& ts_slice,
ReadOptions& read_opts);
std::shared_ptr<Cache> cache_;
std::shared_ptr<Cache> compressed_cache_;
std::shared_ptr<const FilterPolicy> filter_policy_;
DB* db_;
#ifndef ROCKSDB_LITE
TransactionDB* txn_db_;
#endif
// Currently only used in MultiOpsTxnsStressTest
std::atomic<DB*> db_aptr_;
Options options_;
SystemClock* clock_;
std::vector<ColumnFamilyHandle*> column_families_;
std::vector<std::string> column_family_names_;
std::atomic<int> new_column_family_name_;
int num_times_reopened_;
std::unordered_map<std::string, std::vector<std::string>> options_table_;
std::vector<std::string> options_index_;
std::atomic<bool> db_preload_finished_;
// Fields used for continuous verification from another thread
DB* cmp_db_;
std::vector<ColumnFamilyHandle*> cmp_cfhs_;
bool is_db_stopped_;
};
// Load options from OPTIONS file and populate `options`.
extern bool InitializeOptionsFromFile(Options& options);
// Initialize `options` using command line arguments.
// When this function is called, `cache`, `block_cache_compressed`,
// `filter_policy` have all been initialized. Therefore, we just pass them as
// input arguments.
extern void InitializeOptionsFromFlags(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<Cache>& block_cache_compressed,
const std::shared_ptr<const FilterPolicy>& filter_policy, Options& options);
// Initialize `options` on which `InitializeOptionsFromFile()` and
// `InitializeOptionsFromFlags()` have both been called already.
// There are two cases.
// Case 1: OPTIONS file is not specified. Command line arguments have been used
// to initialize `options`. InitializeOptionsGeneral() will use
// `cache`, `block_cache_compressed` and `filter_policy` to initialize
// corresponding fields of `options`. InitializeOptionsGeneral() will
// also set up other fields of `options` so that stress test can run.
// Examples include `create_if_missing` and
// `create_missing_column_families`, etc.
// Case 2: OPTIONS file is specified. It is possible that, after loading from
// the given OPTIONS files, some shared object fields are still not
// initialized because they are not set in the OPTIONS file. In this
// case, if command line arguments indicate that the user wants to set
// up such shared objects, e.g. block cache, compressed block cache,
// row cache, filter policy, then InitializeOptionsGeneral() will honor
// the user's choice, thus passing `cache`, `block_cache_compressed`,
// `filter_policy` as input arguments.
//
// InitializeOptionsGeneral() must not overwrite fields of `options` loaded
// from OPTIONS file.
extern void InitializeOptionsGeneral(
const std::shared_ptr<Cache>& cache,
const std::shared_ptr<Cache>& block_cache_compressed,
const std::shared_ptr<const FilterPolicy>& filter_policy, Options& options);
// If no OPTIONS file is specified, set up `options` so that we can test
// user-defined timestamp which requires `-user_timestamp_size=8`.
// This function also checks for known (currently) incompatible features with
// user-defined timestamp.
extern void CheckAndSetOptionsForUserTimestamp(Options& options);
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS