mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-01 07:15:51 +00:00
babe56ddba
Summary: Users can set the priority for file reads associated with their operation by setting `ReadOptions::rate_limiter_priority` to something other than `Env::IO_TOTAL`. Rate limiting `VerifyChecksum()` and `VerifyFileChecksums()` is the motivation for this PR, so it also includes benchmarks and minor bug fixes to get that working. `RandomAccessFileReader::Read()` already had support for rate limiting compaction reads. I changed that rate limiting to be non-specific to compaction, but rather performed according to the passed in `Env::IOPriority`. Now the compaction read rate limiting is supported by setting `rate_limiter_priority = Env::IO_LOW` on its `ReadOptions`. There is no default value for the new `Env::IOPriority` parameter to `RandomAccessFileReader::Read()`. That means this PR goes through all callers (in some cases multiple layers up the call stack) to find a `ReadOptions` to provide the priority. There are TODOs for cases I believe it would be good to let user control the priority some day (e.g., file footer reads), and no TODO in cases I believe it doesn't matter (e.g., trace file reads). The API doc only lists the missing cases where a file read associated with a provided `ReadOptions` cannot be rate limited. For cases like file ingestion checksum calculation, there is no API to provide `ReadOptions` or `Env::IOPriority`, so I didn't count that as missing. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9424 Test Plan: - new unit tests - new benchmarks on ~50MB database with 1MB/s read rate limit and 100ms refill interval; verified with strace reads are chunked (at 0.1MB per chunk) and spaced roughly 100ms apart. - setup command: `./db_bench -benchmarks=fillrandom,compact -db=/tmp/testdb -target_file_size_base=1048576 -disable_auto_compactions=true -file_checksum=true` - benchmarks command: `strace -ttfe pread64 ./db_bench -benchmarks=verifychecksum,verifyfilechecksums -use_existing_db=true -db=/tmp/testdb -rate_limiter_bytes_per_sec=1048576 -rate_limit_bg_reads=1 -rate_limit_user_ops=true -file_checksum=true` - crash test using IO_USER priority on non-validation reads with https://github.com/facebook/rocksdb/issues/9567 reverted: `python3 tools/db_crashtest.py blackbox --max_key=1000000 --write_buffer_size=524288 --target_file_size_base=524288 --level_compaction_dynamic_level_bytes=true --duration=3600 --rate_limit_bg_reads=true --rate_limit_user_ops=true --rate_limiter_bytes_per_sec=10485760 --interval=10` Reviewed By: hx235 Differential Revision: D33747386 Pulled By: ajkr fbshipit-source-id: a2d985e97912fba8c54763798e04f006ccc56e0c
1050 lines
33 KiB
C++
1050 lines
33 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#ifdef GFLAGS
|
|
#include "db_stress_tool/multi_ops_txns_stress.h"
|
|
|
|
#include "rocksdb/utilities/write_batch_with_index.h"
|
|
#include "util/defer.h"
|
|
#ifndef NDEBUG
|
|
#include "utilities/fault_injection_fs.h"
|
|
#endif // NDEBUG
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
// TODO: move these to gflags.
|
|
static constexpr uint32_t kInitNumC = 1000;
|
|
#ifndef ROCKSDB_LITE
|
|
static constexpr uint32_t kInitialCARatio = 3;
|
|
#endif // ROCKSDB_LITE
|
|
static constexpr bool kDoPreload = true;
|
|
|
|
std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) {
|
|
char buf[8];
|
|
EncodeFixed32(buf, kPrimaryIndexId);
|
|
std::reverse(buf, buf + 4);
|
|
EncodeFixed32(buf + 4, a);
|
|
std::reverse(buf + 4, buf + 8);
|
|
return std::string(buf, sizeof(buf));
|
|
}
|
|
|
|
std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c) {
|
|
char buf[8];
|
|
EncodeFixed32(buf, kSecondaryIndexId);
|
|
std::reverse(buf, buf + 4);
|
|
EncodeFixed32(buf + 4, c);
|
|
std::reverse(buf + 4, buf + 8);
|
|
return std::string(buf, sizeof(buf));
|
|
}
|
|
|
|
std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c,
|
|
uint32_t a) {
|
|
char buf[12];
|
|
EncodeFixed32(buf, kSecondaryIndexId);
|
|
std::reverse(buf, buf + 4);
|
|
EncodeFixed32(buf + 4, c);
|
|
EncodeFixed32(buf + 8, a);
|
|
std::reverse(buf + 4, buf + 8);
|
|
std::reverse(buf + 8, buf + 12);
|
|
return std::string(buf, sizeof(buf));
|
|
}
|
|
|
|
std::tuple<Status, uint32_t, uint32_t>
|
|
MultiOpsTxnsStressTest::Record::DecodePrimaryIndexValue(
|
|
Slice primary_index_value) {
|
|
if (primary_index_value.size() != 8) {
|
|
return std::tuple<Status, uint32_t, uint32_t>{Status::Corruption(""), 0, 0};
|
|
}
|
|
uint32_t b = 0;
|
|
uint32_t c = 0;
|
|
if (!GetFixed32(&primary_index_value, &b) ||
|
|
!GetFixed32(&primary_index_value, &c)) {
|
|
assert(false);
|
|
return std::tuple<Status, uint32_t, uint32_t>{Status::Corruption(""), 0, 0};
|
|
}
|
|
return std::tuple<Status, uint32_t, uint32_t>{Status::OK(), b, c};
|
|
}
|
|
|
|
std::pair<Status, uint32_t>
|
|
MultiOpsTxnsStressTest::Record::DecodeSecondaryIndexValue(
|
|
Slice secondary_index_value) {
|
|
if (secondary_index_value.size() != 4) {
|
|
return std::make_pair(Status::Corruption(""), 0);
|
|
}
|
|
uint32_t crc = 0;
|
|
bool result __attribute__((unused)) =
|
|
GetFixed32(&secondary_index_value, &crc);
|
|
assert(result);
|
|
return std::make_pair(Status::OK(), crc);
|
|
}
|
|
|
|
std::pair<std::string, std::string>
|
|
MultiOpsTxnsStressTest::Record::EncodePrimaryIndexEntry() const {
|
|
std::string primary_index_key = EncodePrimaryKey();
|
|
std::string primary_index_value = EncodePrimaryIndexValue();
|
|
return std::make_pair(primary_index_key, primary_index_value);
|
|
}
|
|
|
|
std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey() const {
|
|
return EncodePrimaryKey(a_);
|
|
}
|
|
|
|
std::string MultiOpsTxnsStressTest::Record::EncodePrimaryIndexValue() const {
|
|
char buf[8];
|
|
EncodeFixed32(buf, b_);
|
|
EncodeFixed32(buf + 4, c_);
|
|
return std::string(buf, sizeof(buf));
|
|
}
|
|
|
|
std::pair<std::string, std::string>
|
|
MultiOpsTxnsStressTest::Record::EncodeSecondaryIndexEntry() const {
|
|
std::string secondary_index_key;
|
|
char buf[12];
|
|
EncodeFixed32(buf, kSecondaryIndexId);
|
|
std::reverse(buf, buf + 4);
|
|
EncodeFixed32(buf + 4, c_);
|
|
EncodeFixed32(buf + 8, a_);
|
|
std::reverse(buf + 4, buf + 8);
|
|
std::reverse(buf + 8, buf + 12);
|
|
secondary_index_key.assign(buf, sizeof(buf));
|
|
|
|
// Secondary index value is always 4-byte crc32 of the secondary key
|
|
std::string secondary_index_value;
|
|
uint32_t crc = crc32c::Value(buf, sizeof(buf));
|
|
PutFixed32(&secondary_index_value, crc);
|
|
return std::make_pair(secondary_index_key, secondary_index_value);
|
|
}
|
|
|
|
std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey() const {
|
|
char buf[12];
|
|
EncodeFixed32(buf, kSecondaryIndexId);
|
|
std::reverse(buf, buf + 4);
|
|
EncodeFixed32(buf + 4, c_);
|
|
EncodeFixed32(buf + 8, a_);
|
|
std::reverse(buf + 4, buf + 8);
|
|
std::reverse(buf + 8, buf + 12);
|
|
return std::string(buf, sizeof(buf));
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry(
|
|
Slice primary_index_key, Slice primary_index_value) {
|
|
if (primary_index_key.size() != 8) {
|
|
assert(false);
|
|
return Status::Corruption("Primary index key length is not 8");
|
|
}
|
|
|
|
const char* const index_id_buf = primary_index_key.data();
|
|
uint32_t index_id =
|
|
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[0])) << 24;
|
|
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[1]))
|
|
<< 16;
|
|
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[2]))
|
|
<< 8;
|
|
index_id +=
|
|
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[3]));
|
|
primary_index_key.remove_prefix(sizeof(uint32_t));
|
|
if (index_id != kPrimaryIndexId) {
|
|
std::ostringstream oss;
|
|
oss << "Unexpected primary index id: " << index_id;
|
|
return Status::Corruption(oss.str());
|
|
}
|
|
|
|
const char* const buf = primary_index_key.data();
|
|
a_ = static_cast<uint32_t>(static_cast<unsigned char>(buf[0])) << 24;
|
|
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[1])) << 16;
|
|
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[2])) << 8;
|
|
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[3]));
|
|
|
|
if (primary_index_value.size() != 8) {
|
|
return Status::Corruption("Primary index value length is not 8");
|
|
}
|
|
GetFixed32(&primary_index_value, &b_);
|
|
GetFixed32(&primary_index_value, &c_);
|
|
return Status::OK();
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::Record::DecodeSecondaryIndexEntry(
|
|
Slice secondary_index_key, Slice secondary_index_value) {
|
|
if (secondary_index_key.size() != 12) {
|
|
return Status::Corruption("Secondary index key length is not 12");
|
|
}
|
|
uint32_t crc =
|
|
crc32c::Value(secondary_index_key.data(), secondary_index_key.size());
|
|
|
|
const char* const index_id_buf = secondary_index_key.data();
|
|
uint32_t index_id =
|
|
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[0])) << 24;
|
|
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[1]))
|
|
<< 16;
|
|
index_id += static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[2]))
|
|
<< 8;
|
|
index_id +=
|
|
static_cast<uint32_t>(static_cast<unsigned char>(index_id_buf[3]));
|
|
secondary_index_key.remove_prefix(sizeof(uint32_t));
|
|
if (index_id != kSecondaryIndexId) {
|
|
std::ostringstream oss;
|
|
oss << "Unexpected secondary index id: " << index_id;
|
|
return Status::Corruption(oss.str());
|
|
}
|
|
|
|
const char* const buf = secondary_index_key.data();
|
|
assert(secondary_index_key.size() == 8);
|
|
c_ = static_cast<uint32_t>(static_cast<unsigned char>(buf[0])) << 24;
|
|
c_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[1])) << 16;
|
|
c_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[2])) << 8;
|
|
c_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[3]));
|
|
|
|
a_ = static_cast<uint32_t>(static_cast<unsigned char>(buf[4])) << 24;
|
|
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[5])) << 16;
|
|
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[6])) << 8;
|
|
a_ += static_cast<uint32_t>(static_cast<unsigned char>(buf[7]));
|
|
|
|
if (secondary_index_value.size() != 4) {
|
|
return Status::Corruption("Secondary index value length is not 4");
|
|
}
|
|
uint32_t val = 0;
|
|
GetFixed32(&secondary_index_value, &val);
|
|
if (val != crc) {
|
|
std::ostringstream oss;
|
|
oss << "Secondary index key checksum mismatch, stored: " << val
|
|
<< ", recomputed: " << crc;
|
|
return Status::Corruption(oss.str());
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void MultiOpsTxnsStressTest::FinishInitDb(SharedState* shared) {
|
|
if (FLAGS_enable_compaction_filter) {
|
|
// TODO (yanqin) enable compaction filter
|
|
}
|
|
if (kDoPreload) {
|
|
ReopenAndPreloadDb(shared);
|
|
}
|
|
}
|
|
|
|
void MultiOpsTxnsStressTest::ReopenAndPreloadDb(SharedState* shared) {
|
|
(void)shared;
|
|
#ifndef ROCKSDB_LITE
|
|
std::vector<ColumnFamilyDescriptor> cf_descs;
|
|
for (const auto* handle : column_families_) {
|
|
cf_descs.emplace_back(handle->GetName(), ColumnFamilyOptions(options_));
|
|
}
|
|
CancelAllBackgroundWork(db_, /*wait=*/true);
|
|
for (auto* handle : column_families_) {
|
|
delete handle;
|
|
}
|
|
column_families_.clear();
|
|
delete db_;
|
|
db_ = nullptr;
|
|
txn_db_ = nullptr;
|
|
|
|
TransactionDBOptions txn_db_opts;
|
|
txn_db_opts.skip_concurrency_control = true; // speed-up preloading
|
|
Status s = TransactionDB::Open(options_, txn_db_opts, FLAGS_db, cf_descs,
|
|
&column_families_, &txn_db_);
|
|
if (s.ok()) {
|
|
db_ = txn_db_;
|
|
} else {
|
|
fprintf(stderr, "Failed to open db: %s\n", s.ToString().c_str());
|
|
exit(1);
|
|
}
|
|
|
|
PreloadDb(shared, kInitNumC);
|
|
|
|
// Reopen
|
|
CancelAllBackgroundWork(db_, /*wait=*/true);
|
|
for (auto* handle : column_families_) {
|
|
delete handle;
|
|
}
|
|
column_families_.clear();
|
|
s = db_->Close();
|
|
if (!s.ok()) {
|
|
fprintf(stderr, "Error during closing db: %s\n", s.ToString().c_str());
|
|
exit(1);
|
|
}
|
|
delete db_;
|
|
db_ = nullptr;
|
|
txn_db_ = nullptr;
|
|
|
|
Open();
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
// Used for point-lookup transaction
|
|
Status MultiOpsTxnsStressTest::TestGet(
|
|
ThreadState* thread, const ReadOptions& read_opts,
|
|
const std::vector<int>& /*rand_column_families*/,
|
|
const std::vector<int64_t>& /*rand_keys*/) {
|
|
uint32_t a = ChooseA(thread);
|
|
return PointLookupTxn(thread, read_opts, a);
|
|
}
|
|
|
|
// Not used.
|
|
std::vector<Status> MultiOpsTxnsStressTest::TestMultiGet(
|
|
ThreadState* /*thread*/, const ReadOptions& /*read_opts*/,
|
|
const std::vector<int>& /*rand_column_families*/,
|
|
const std::vector<int64_t>& /*rand_keys*/) {
|
|
return std::vector<Status>{Status::NotSupported()};
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::TestPrefixScan(
|
|
ThreadState* thread, const ReadOptions& read_opts,
|
|
const std::vector<int>& rand_column_families,
|
|
const std::vector<int64_t>& rand_keys) {
|
|
(void)thread;
|
|
(void)read_opts;
|
|
(void)rand_column_families;
|
|
(void)rand_keys;
|
|
return Status::OK();
|
|
}
|
|
|
|
// Given a key K, this creates an iterator which scans to K and then
|
|
// does a random sequence of Next/Prev operations.
|
|
Status MultiOpsTxnsStressTest::TestIterate(
|
|
ThreadState* thread, const ReadOptions& read_opts,
|
|
const std::vector<int>& /*rand_column_families*/,
|
|
const std::vector<int64_t>& /*rand_keys*/) {
|
|
uint32_t c = thread->rand.Next() % kInitNumC;
|
|
return RangeScanTxn(thread, read_opts, c);
|
|
}
|
|
|
|
// Not intended for use.
|
|
Status MultiOpsTxnsStressTest::TestPut(ThreadState* /*thread*/,
|
|
WriteOptions& /*write_opts*/,
|
|
const ReadOptions& /*read_opts*/,
|
|
const std::vector<int>& /*cf_ids*/,
|
|
const std::vector<int64_t>& /*keys*/,
|
|
char (&value)[100],
|
|
std::unique_ptr<MutexLock>& /*lock*/) {
|
|
(void)value;
|
|
return Status::NotSupported();
|
|
}
|
|
|
|
// Not intended for use.
|
|
Status MultiOpsTxnsStressTest::TestDelete(
|
|
ThreadState* /*thread*/, WriteOptions& /*write_opts*/,
|
|
const std::vector<int>& /*rand_column_families*/,
|
|
const std::vector<int64_t>& /*rand_keys*/,
|
|
std::unique_ptr<MutexLock>& /*lock*/) {
|
|
return Status::NotSupported();
|
|
}
|
|
|
|
// Not intended for use.
|
|
Status MultiOpsTxnsStressTest::TestDeleteRange(
|
|
ThreadState* /*thread*/, WriteOptions& /*write_opts*/,
|
|
const std::vector<int>& /*rand_column_families*/,
|
|
const std::vector<int64_t>& /*rand_keys*/,
|
|
std::unique_ptr<MutexLock>& /*lock*/) {
|
|
return Status::NotSupported();
|
|
}
|
|
|
|
void MultiOpsTxnsStressTest::TestIngestExternalFile(
|
|
ThreadState* thread, const std::vector<int>& rand_column_families,
|
|
const std::vector<int64_t>& /*rand_keys*/,
|
|
std::unique_ptr<MutexLock>& /*lock*/) {
|
|
// TODO (yanqin)
|
|
(void)thread;
|
|
(void)rand_column_families;
|
|
}
|
|
|
|
void MultiOpsTxnsStressTest::TestCompactRange(
|
|
ThreadState* thread, int64_t /*rand_key*/, const Slice& /*start_key*/,
|
|
ColumnFamilyHandle* column_family) {
|
|
// TODO (yanqin).
|
|
// May use GetRangeHash() for validation before and after DB::CompactRange()
|
|
// completes.
|
|
(void)thread;
|
|
(void)column_family;
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::TestBackupRestore(
|
|
ThreadState* thread, const std::vector<int>& rand_column_families,
|
|
const std::vector<int64_t>& /*rand_keys*/) {
|
|
// TODO (yanqin)
|
|
(void)thread;
|
|
(void)rand_column_families;
|
|
return Status::OK();
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::TestCheckpoint(
|
|
ThreadState* thread, const std::vector<int>& rand_column_families,
|
|
const std::vector<int64_t>& /*rand_keys*/) {
|
|
// TODO (yanqin)
|
|
(void)thread;
|
|
(void)rand_column_families;
|
|
return Status::OK();
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
Status MultiOpsTxnsStressTest::TestApproximateSize(
|
|
ThreadState* thread, uint64_t iteration,
|
|
const std::vector<int>& rand_column_families,
|
|
const std::vector<int64_t>& /*rand_keys*/) {
|
|
// TODO (yanqin)
|
|
(void)thread;
|
|
(void)iteration;
|
|
(void)rand_column_families;
|
|
return Status::OK();
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
Status MultiOpsTxnsStressTest::TestCustomOperations(
|
|
ThreadState* thread, const std::vector<int>& rand_column_families) {
|
|
(void)rand_column_families;
|
|
// Randomly choose from 0, 1, and 2.
|
|
// TODO (yanqin) allow user to configure probability of each operation.
|
|
uint32_t rand = thread->rand.Uniform(3);
|
|
Status s;
|
|
if (0 == rand) {
|
|
// Update primary key.
|
|
uint32_t old_a = ChooseA(thread);
|
|
uint32_t new_a = GenerateNextA();
|
|
s = PrimaryKeyUpdateTxn(thread, old_a, new_a);
|
|
} else if (1 == rand) {
|
|
// Update secondary key.
|
|
uint32_t old_c = thread->rand.Next() % kInitNumC;
|
|
int count = 0;
|
|
uint32_t new_c = 0;
|
|
do {
|
|
++count;
|
|
new_c = thread->rand.Next() % kInitNumC;
|
|
} while (count < 100 && new_c == old_c);
|
|
if (count >= 100) {
|
|
// If we reach here, it means our random number generator has a serious
|
|
// problem, or kInitNumC is chosen poorly.
|
|
std::terminate();
|
|
}
|
|
s = SecondaryKeyUpdateTxn(thread, old_c, new_c);
|
|
} else if (2 == rand) {
|
|
// Update primary index value.
|
|
uint32_t a = ChooseA(thread);
|
|
s = UpdatePrimaryIndexValueTxn(thread, a, /*b_delta=*/1);
|
|
} else {
|
|
// Should never reach here.
|
|
assert(false);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
|
|
uint32_t old_a,
|
|
uint32_t new_a) {
|
|
#ifdef ROCKSDB_LITE
|
|
(void)thread;
|
|
(void)old_a;
|
|
(void)new_a;
|
|
return Status::NotSupported();
|
|
#else
|
|
std::string old_pk = Record::EncodePrimaryKey(old_a);
|
|
std::string new_pk = Record::EncodePrimaryKey(new_a);
|
|
Transaction* txn = nullptr;
|
|
WriteOptions wopts;
|
|
Status s = NewTxn(wopts, &txn);
|
|
if (!s.ok()) {
|
|
assert(!txn);
|
|
thread->stats.AddErrors(1);
|
|
return s;
|
|
}
|
|
|
|
assert(txn);
|
|
txn->SetSnapshotOnNextOperation(/*notifier=*/nullptr);
|
|
|
|
const Defer cleanup([&s, thread, txn, this]() {
|
|
if (s.ok()) {
|
|
// Two gets, one for existing pk, one for locking potential new pk.
|
|
thread->stats.AddGets(/*ngets=*/2, /*nfounds=*/1);
|
|
thread->stats.AddDeletes(1);
|
|
thread->stats.AddBytesForWrites(
|
|
/*nwrites=*/2,
|
|
Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize);
|
|
thread->stats.AddSingleDeletes(1);
|
|
return;
|
|
}
|
|
if (s.IsNotFound()) {
|
|
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0);
|
|
} else if (s.IsBusy()) {
|
|
// ignore.
|
|
} else {
|
|
thread->stats.AddErrors(1);
|
|
}
|
|
RollbackTxn(txn).PermitUncheckedError();
|
|
});
|
|
|
|
ReadOptions ropts;
|
|
ropts.rate_limiter_priority =
|
|
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
|
|
std::string value;
|
|
s = txn->GetForUpdate(ropts, old_pk, &value);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
std::string empty_value;
|
|
s = txn->GetForUpdate(ropts, new_pk, &empty_value);
|
|
if (s.ok()) {
|
|
assert(!empty_value.empty());
|
|
s = Status::Busy();
|
|
return s;
|
|
}
|
|
|
|
auto result = Record::DecodePrimaryIndexValue(value);
|
|
s = std::get<0>(result);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
uint32_t b = std::get<1>(result);
|
|
uint32_t c = std::get<2>(result);
|
|
|
|
ColumnFamilyHandle* cf = db_->DefaultColumnFamily();
|
|
s = txn->Delete(cf, old_pk, /*assume_tracked=*/true);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
s = txn->Put(cf, new_pk, value, /*assume_tracked=*/true);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
auto* wb = txn->GetWriteBatch();
|
|
assert(wb);
|
|
|
|
std::string old_sk = Record::EncodeSecondaryKey(c, old_a);
|
|
s = wb->SingleDelete(old_sk);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
Record record(new_a, b, c);
|
|
std::string new_sk;
|
|
std::string new_crc;
|
|
std::tie(new_sk, new_crc) = record.EncodeSecondaryIndexEntry();
|
|
s = wb->Put(new_sk, new_crc);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
s = CommitTxn(txn);
|
|
return s;
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
|
|
uint32_t old_c,
|
|
uint32_t new_c) {
|
|
#ifdef ROCKSDB_LITE
|
|
(void)thread;
|
|
(void)old_c;
|
|
(void)new_c;
|
|
return Status::NotSupported();
|
|
#else
|
|
Transaction* txn = nullptr;
|
|
WriteOptions wopts;
|
|
Status s = NewTxn(wopts, &txn);
|
|
if (!s.ok()) {
|
|
assert(!txn);
|
|
thread->stats.AddErrors(1);
|
|
return s;
|
|
}
|
|
|
|
assert(txn);
|
|
|
|
Iterator* it = nullptr;
|
|
long iterations = 0;
|
|
const Defer cleanup([&s, thread, &it, txn, this, &iterations]() {
|
|
delete it;
|
|
if (s.ok()) {
|
|
thread->stats.AddIterations(iterations);
|
|
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1);
|
|
thread->stats.AddSingleDeletes(1);
|
|
thread->stats.AddBytesForWrites(
|
|
/*nwrites=*/2,
|
|
Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize);
|
|
return;
|
|
} else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
|
|
s.IsMergeInProgress()) {
|
|
// ww-conflict detected, or
|
|
// lock cannot be acquired, or
|
|
// memtable history is not large enough for conflict checking, or
|
|
// Merge operation cannot be resolved.
|
|
// TODO (yanqin) add stats for other cases?
|
|
} else if (s.IsNotFound()) {
|
|
// ignore.
|
|
} else {
|
|
thread->stats.AddErrors(1);
|
|
}
|
|
RollbackTxn(txn).PermitUncheckedError();
|
|
});
|
|
|
|
// TODO (yanqin) try SetSnapshotOnNextOperation(). We currently need to take
|
|
// a snapshot here because we will later verify that point lookup in the
|
|
// primary index using GetForUpdate() returns the same value for 'c' as the
|
|
// iterator. The iterator does not need a snapshot though, because it will be
|
|
// assigned the current latest (published) sequence in the db, which will be
|
|
// no smaller than the snapshot created here. The GetForUpdate will perform
|
|
// ww conflict checking to ensure GetForUpdate() (using the snapshot) sees
|
|
// the same data as this iterator.
|
|
txn->SetSnapshot();
|
|
std::string old_sk_prefix = Record::EncodeSecondaryKey(old_c);
|
|
std::string iter_ub_str = Record::EncodeSecondaryKey(old_c + 1);
|
|
Slice iter_ub = iter_ub_str;
|
|
ReadOptions ropts;
|
|
if (thread->rand.OneIn(2)) {
|
|
ropts.snapshot = txn->GetSnapshot();
|
|
}
|
|
ropts.total_order_seek = true;
|
|
ropts.iterate_upper_bound = &iter_ub;
|
|
ropts.rate_limiter_priority =
|
|
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
|
|
it = txn->GetIterator(ropts);
|
|
|
|
assert(it);
|
|
it->Seek(old_sk_prefix);
|
|
if (!it->Valid()) {
|
|
s = Status::NotFound();
|
|
return s;
|
|
}
|
|
auto* wb = txn->GetWriteBatch();
|
|
assert(wb);
|
|
|
|
do {
|
|
++iterations;
|
|
Record record;
|
|
s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
|
|
if (!s.ok()) {
|
|
VerificationAbort(thread->shared, "Cannot decode secondary key", s);
|
|
break;
|
|
}
|
|
// At this point, record.b is not known yet, thus we need to access
|
|
// primary index.
|
|
std::string pk = Record::EncodePrimaryKey(record.a_value());
|
|
std::string value;
|
|
ReadOptions read_opts;
|
|
read_opts.rate_limiter_priority =
|
|
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
|
|
read_opts.snapshot = txn->GetSnapshot();
|
|
s = txn->GetForUpdate(read_opts, pk, &value);
|
|
if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
|
|
s.IsMergeInProgress()) {
|
|
// Write conflict, or cannot acquire lock, or memtable size is not large
|
|
// enough, or merge cannot be resolved.
|
|
break;
|
|
} else if (!s.ok()) {
|
|
// We can also fail verification here.
|
|
VerificationAbort(thread->shared, "pk should exist, but does not", s);
|
|
break;
|
|
}
|
|
auto result = Record::DecodePrimaryIndexValue(value);
|
|
s = std::get<0>(result);
|
|
if (!s.ok()) {
|
|
VerificationAbort(thread->shared, "Cannot decode primary index value", s);
|
|
break;
|
|
}
|
|
uint32_t b = std::get<1>(result);
|
|
uint32_t c = std::get<2>(result);
|
|
if (c != old_c) {
|
|
std::ostringstream oss;
|
|
oss << "c in primary index does not match secondary index: " << c
|
|
<< " != " << old_c;
|
|
s = Status::Corruption();
|
|
VerificationAbort(thread->shared, oss.str(), s);
|
|
break;
|
|
}
|
|
Record new_rec(record.a_value(), b, new_c);
|
|
std::string new_primary_index_value = new_rec.EncodePrimaryIndexValue();
|
|
ColumnFamilyHandle* cf = db_->DefaultColumnFamily();
|
|
s = txn->Put(cf, pk, new_primary_index_value, /*assume_tracked=*/true);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
std::string old_sk = it->key().ToString(/*hex=*/false);
|
|
std::string new_sk;
|
|
std::string new_crc;
|
|
std::tie(new_sk, new_crc) = new_rec.EncodeSecondaryIndexEntry();
|
|
s = wb->SingleDelete(old_sk);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
s = wb->Put(new_sk, new_crc);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
|
|
it->Next();
|
|
} while (it->Valid());
|
|
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
s = CommitTxn(txn);
|
|
|
|
return s;
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
|
|
uint32_t a,
|
|
uint32_t b_delta) {
|
|
#ifdef ROCKSDB_LITE
|
|
(void)thread;
|
|
(void)a;
|
|
(void)b_delta;
|
|
return Status::NotSupported();
|
|
#else
|
|
std::string pk_str = Record::EncodePrimaryKey(a);
|
|
Transaction* txn = nullptr;
|
|
WriteOptions wopts;
|
|
Status s = NewTxn(wopts, &txn);
|
|
if (!s.ok()) {
|
|
assert(!txn);
|
|
thread->stats.AddErrors(1);
|
|
return s;
|
|
}
|
|
|
|
assert(txn);
|
|
|
|
const Defer cleanup([&s, thread, txn, this]() {
|
|
if (s.ok()) {
|
|
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1);
|
|
thread->stats.AddBytesForWrites(
|
|
/*nwrites=*/1, /*nbytes=*/Record::kPrimaryIndexEntrySize);
|
|
return;
|
|
}
|
|
if (s.IsNotFound()) {
|
|
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0);
|
|
} else if (s.IsInvalidArgument()) {
|
|
// ignored.
|
|
} else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
|
|
s.IsMergeInProgress()) {
|
|
// ignored.
|
|
} else {
|
|
thread->stats.AddErrors(1);
|
|
}
|
|
RollbackTxn(txn).PermitUncheckedError();
|
|
});
|
|
ReadOptions ropts;
|
|
ropts.rate_limiter_priority =
|
|
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
|
|
std::string value;
|
|
s = txn->GetForUpdate(ropts, pk_str, &value);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
auto result = Record::DecodePrimaryIndexValue(value);
|
|
if (!std::get<0>(result).ok()) {
|
|
return s;
|
|
}
|
|
uint32_t b = std::get<1>(result) + b_delta;
|
|
uint32_t c = std::get<2>(result);
|
|
Record record(a, b, c);
|
|
std::string primary_index_value = record.EncodePrimaryIndexValue();
|
|
ColumnFamilyHandle* cf = db_->DefaultColumnFamily();
|
|
s = txn->Put(cf, pk_str, primary_index_value, /*assume_tracked=*/true);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
s = CommitTxn(txn);
|
|
return s;
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread,
|
|
ReadOptions ropts, uint32_t a) {
|
|
#ifdef ROCKSDB_LITE
|
|
(void)thread;
|
|
(void)ropts;
|
|
(void)a;
|
|
return Status::NotSupported();
|
|
#else
|
|
std::string pk_str = Record::EncodePrimaryKey(a);
|
|
// pk may or may not exist
|
|
PinnableSlice value;
|
|
|
|
Transaction* txn = nullptr;
|
|
WriteOptions wopts;
|
|
Status s = NewTxn(wopts, &txn);
|
|
if (!s.ok()) {
|
|
assert(!txn);
|
|
thread->stats.AddErrors(1);
|
|
return s;
|
|
}
|
|
|
|
assert(txn);
|
|
|
|
const Defer cleanup([&s, thread, txn, this]() {
|
|
if (s.ok()) {
|
|
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1);
|
|
return;
|
|
} else if (s.IsNotFound()) {
|
|
thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0);
|
|
} else {
|
|
thread->stats.AddErrors(1);
|
|
}
|
|
RollbackTxn(txn).PermitUncheckedError();
|
|
});
|
|
|
|
s = txn->Get(ropts, db_->DefaultColumnFamily(), pk_str, &value);
|
|
if (s.ok()) {
|
|
s = txn->Commit();
|
|
}
|
|
return s;
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread,
|
|
ReadOptions ropts, uint32_t c) {
|
|
#ifdef ROCKSDB_LITE
|
|
(void)thread;
|
|
(void)ropts;
|
|
(void)c;
|
|
return Status::NotSupported();
|
|
#else
|
|
std::string sk = Record::EncodeSecondaryKey(c);
|
|
|
|
Transaction* txn = nullptr;
|
|
WriteOptions wopts;
|
|
Status s = NewTxn(wopts, &txn);
|
|
if (!s.ok()) {
|
|
assert(!txn);
|
|
thread->stats.AddErrors(1);
|
|
return s;
|
|
}
|
|
|
|
assert(txn);
|
|
|
|
const Defer cleanup([&s, thread, txn, this]() {
|
|
if (s.ok()) {
|
|
thread->stats.AddIterations(1);
|
|
return;
|
|
}
|
|
thread->stats.AddErrors(1);
|
|
RollbackTxn(txn).PermitUncheckedError();
|
|
});
|
|
std::unique_ptr<Iterator> iter(txn->GetIterator(ropts));
|
|
iter->Seek(sk);
|
|
if (iter->status().ok()) {
|
|
s = txn->Commit();
|
|
} else {
|
|
s = iter->status();
|
|
}
|
|
// TODO (yanqin) more Seek/SeekForPrev/Next/Prev/SeekToFirst/SeekToLast
|
|
return s;
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
|
|
if (thread->shared->HasVerificationFailedYet()) {
|
|
return;
|
|
}
|
|
const Snapshot* const snapshot = db_->GetSnapshot();
|
|
assert(snapshot);
|
|
ManagedSnapshot snapshot_guard(db_, snapshot);
|
|
|
|
// TODO (yanqin) with a probability, we can use either forward or backward
|
|
// iterator in subsequent checks. We can also use more advanced features in
|
|
// range scan. For now, let's just use simple forward iteration with
|
|
// total_order_seek = true.
|
|
|
|
// First, iterate primary index.
|
|
size_t primary_index_entries_count = 0;
|
|
{
|
|
char buf[4];
|
|
EncodeFixed32(buf, Record::kPrimaryIndexId + 1);
|
|
std::reverse(buf, buf + sizeof(buf));
|
|
std::string iter_ub_str(buf, sizeof(buf));
|
|
Slice iter_ub = iter_ub_str;
|
|
|
|
// This `ReadOptions` is for validation purposes. Ignore
|
|
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
|
|
ReadOptions ropts;
|
|
ropts.snapshot = snapshot;
|
|
ropts.total_order_seek = true;
|
|
ropts.iterate_upper_bound = &iter_ub;
|
|
|
|
std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
|
|
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
|
++primary_index_entries_count;
|
|
}
|
|
}
|
|
|
|
// Second, iterate secondary index.
|
|
size_t secondary_index_entries_count = 0;
|
|
{
|
|
char buf[4];
|
|
EncodeFixed32(buf, Record::kSecondaryIndexId);
|
|
std::reverse(buf, buf + sizeof(buf));
|
|
const std::string start_key(buf, sizeof(buf));
|
|
|
|
// This `ReadOptions` is for validation purposes. Ignore
|
|
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
|
|
ReadOptions ropts;
|
|
ropts.snapshot = snapshot;
|
|
ropts.total_order_seek = true;
|
|
|
|
std::unique_ptr<Iterator> it(db_->NewIterator(ropts));
|
|
for (it->Seek(start_key); it->Valid(); it->Next()) {
|
|
++secondary_index_entries_count;
|
|
Record record;
|
|
Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value());
|
|
if (!s.ok()) {
|
|
VerificationAbort(thread->shared, "Cannot decode secondary index entry",
|
|
s);
|
|
return;
|
|
}
|
|
// After decoding secondary index entry, we know a and c. Crc is verified
|
|
// in decoding phase.
|
|
//
|
|
// Form a primary key and search in the primary index.
|
|
std::string pk = Record::EncodePrimaryKey(record.a_value());
|
|
std::string value;
|
|
s = db_->Get(ropts, pk, &value);
|
|
if (!s.ok()) {
|
|
std::ostringstream oss;
|
|
oss << "Error searching pk " << Slice(pk).ToString(true) << ". "
|
|
<< s.ToString();
|
|
VerificationAbort(thread->shared, oss.str(), s);
|
|
return;
|
|
}
|
|
auto result = Record::DecodePrimaryIndexValue(value);
|
|
s = std::get<0>(result);
|
|
if (!s.ok()) {
|
|
std::ostringstream oss;
|
|
oss << "Error decoding primary index value "
|
|
<< Slice(value).ToString(true) << ". " << s.ToString();
|
|
VerificationAbort(thread->shared, oss.str(), s);
|
|
}
|
|
uint32_t c_in_primary = std::get<2>(result);
|
|
if (c_in_primary != record.c_value()) {
|
|
std::ostringstream oss;
|
|
oss << "Pk/sk mismatch. pk: (c=" << c_in_primary
|
|
<< "), sk: (c=" << record.c_value() << ")";
|
|
VerificationAbort(thread->shared, oss.str(), s);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (secondary_index_entries_count != primary_index_entries_count) {
|
|
std::ostringstream oss;
|
|
oss << "Pk/sk mismatch: primary index has " << primary_index_entries_count
|
|
<< " entries. Secondary index has " << secondary_index_entries_count
|
|
<< " entries.";
|
|
VerificationAbort(thread->shared, oss.str(), Status::OK());
|
|
}
|
|
}
|
|
|
|
uint32_t MultiOpsTxnsStressTest::ChooseA(ThreadState* thread) {
|
|
uint32_t rnd = thread->rand.Uniform(5);
|
|
uint32_t next_a_low = next_a_.load(std::memory_order_relaxed);
|
|
assert(next_a_low != 0);
|
|
if (rnd == 0) {
|
|
return next_a_low - 1;
|
|
}
|
|
|
|
uint32_t result = 0;
|
|
result = thread->rand.Next() % next_a_low;
|
|
if (thread->rand.OneIn(3)) {
|
|
return result;
|
|
}
|
|
uint32_t next_a_high = next_a_.load(std::memory_order_relaxed);
|
|
// A higher chance that this a still exists.
|
|
return next_a_low + (next_a_high - next_a_low) / 2;
|
|
}
|
|
|
|
uint32_t MultiOpsTxnsStressTest::GenerateNextA() {
|
|
return next_a_.fetch_add(1, std::memory_order_relaxed);
|
|
}
|
|
|
|
void MultiOpsTxnsStressTest::PreloadDb(SharedState* shared, size_t num_c) {
|
|
#ifdef ROCKSDB_LITE
|
|
(void)shared;
|
|
(void)num_c;
|
|
#else
|
|
// TODO (yanqin) maybe parallelize. Currently execute in single thread.
|
|
WriteOptions wopts;
|
|
wopts.disableWAL = true;
|
|
wopts.sync = false;
|
|
Random rnd(shared->GetSeed());
|
|
assert(txn_db_);
|
|
for (uint32_t c = 0; c < static_cast<uint32_t>(num_c); ++c) {
|
|
for (uint32_t a = c * kInitialCARatio; a < ((c + 1) * kInitialCARatio);
|
|
++a) {
|
|
Record record(a, /*_b=*/rnd.Next(), c);
|
|
WriteBatch wb;
|
|
const auto primary_index_entry = record.EncodePrimaryIndexEntry();
|
|
Status s = wb.Put(primary_index_entry.first, primary_index_entry.second);
|
|
assert(s.ok());
|
|
const auto secondary_index_entry = record.EncodeSecondaryIndexEntry();
|
|
s = wb.Put(secondary_index_entry.first, secondary_index_entry.second);
|
|
assert(s.ok());
|
|
s = txn_db_->Write(wopts, &wb);
|
|
assert(s.ok());
|
|
|
|
// TODO (yanqin): make the following check optional, especially when data
|
|
// size is large.
|
|
Record tmp_rec;
|
|
tmp_rec.SetB(record.b_value());
|
|
s = tmp_rec.DecodeSecondaryIndexEntry(secondary_index_entry.first,
|
|
secondary_index_entry.second);
|
|
assert(s.ok());
|
|
assert(tmp_rec == record);
|
|
}
|
|
}
|
|
Status s = db_->Flush(FlushOptions());
|
|
assert(s.ok());
|
|
next_a_.store(static_cast<uint32_t>((num_c + 1) * kInitialCARatio));
|
|
fprintf(stdout, "DB preloaded with %d entries\n",
|
|
static_cast<int>(num_c * kInitialCARatio));
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
StressTest* CreateMultiOpsTxnsStressTest() {
|
|
return new MultiOpsTxnsStressTest();
|
|
}
|
|
|
|
void CheckAndSetOptionsForMultiOpsTxnStressTest() {
|
|
#ifndef ROCKSDB_LITE
|
|
if (FLAGS_test_batches_snapshots || FLAGS_test_cf_consistency) {
|
|
fprintf(stderr,
|
|
"-test_multi_ops_txns is not compatible with "
|
|
"-test_bathces_snapshots and -test_cf_consistency\n");
|
|
exit(1);
|
|
}
|
|
if (!FLAGS_use_txn) {
|
|
fprintf(stderr, "-use_txn must be true if -test_multi_ops_txns\n");
|
|
exit(1);
|
|
}
|
|
if (FLAGS_clear_column_family_one_in > 0) {
|
|
fprintf(stderr,
|
|
"-test_multi_ops_txns is not compatible with clearing column "
|
|
"families\n");
|
|
exit(1);
|
|
}
|
|
if (FLAGS_column_families > 1) {
|
|
// TODO (yanqin) support separating primary index and secondary index in
|
|
// different column families.
|
|
fprintf(stderr,
|
|
"-test_multi_ops_txns currently does not use more than one column "
|
|
"family\n");
|
|
exit(1);
|
|
}
|
|
if (FLAGS_writepercent > 0 || FLAGS_delpercent > 0 ||
|
|
FLAGS_delrangepercent > 0) {
|
|
fprintf(stderr,
|
|
"-test_multi_ops_txns requires that -writepercent, -delpercent and "
|
|
"-delrangepercent be 0\n");
|
|
exit(1);
|
|
}
|
|
#else
|
|
fprintf(stderr, "-test_multi_ops_txns not supported in ROCKSDB_LITE mode\n");
|
|
exit(1);
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
#endif // GFLAGS
|