rocksdb/db_stress_tool/no_batched_ops_stress.cc

1294 lines
47 KiB
C++
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
#include "rocksdb/utilities/transaction_db.h"
Account memory of FileMetaData in global memory limit (#9924) Summary: **Context/Summary:** As revealed by heap profiling, allocation of `FileMetaData` for [newly created file added to a Version](https://github.com/facebook/rocksdb/pull/9924/files#diff-a6aa385940793f95a2c5b39cc670bd440c4547fa54fd44622f756382d5e47e43R774) can consume significant heap memory. This PR is to account that toward our global memory limit based on block cache capacity. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9924 Test Plan: - Previous `make check` verified there are only 2 places where the memory of the allocated `FileMetaData` can be released - New unit test `TEST_P(ChargeFileMetadataTestWithParam, Basic)` - db bench (CPU cost of `charge_file_metadata` in write and compact) - **write micros/op: -0.24%** : `TEST_TMPDIR=/dev/shm/testdb ./db_bench -benchmarks=fillseq -db=$TEST_TMPDIR -charge_file_metadata=1 (remove this option for pre-PR) -disable_auto_compactions=1 -write_buffer_size=100000 -num=4000000 | egrep 'fillseq'` - **compact micros/op -0.87%** : `TEST_TMPDIR=/dev/shm/testdb ./db_bench -benchmarks=fillseq -db=$TEST_TMPDIR -charge_file_metadata=1 -disable_auto_compactions=1 -write_buffer_size=100000 -num=4000000 -numdistinct=1000 && ./db_bench -benchmarks=compact -db=$TEST_TMPDIR -use_existing_db=1 -charge_file_metadata=1 -disable_auto_compactions=1 | egrep 'compact'` table 1 - write #-run | (pre-PR) avg micros/op | std micros/op | (post-PR) micros/op | std micros/op | change (%) -- | -- | -- | -- | -- | -- 10 | 3.9711 | 0.264408 | 3.9914 | 0.254563 | 0.5111933721 20 | 3.83905 | 0.0664488 | 3.8251 | 0.0695456 | -0.3633711465 40 | 3.86625 | 0.136669 | 3.8867 | 0.143765 | 0.5289363078 80 | 3.87828 | 0.119007 | 3.86791 | 0.115674 | **-0.2673865734** 160 | 3.87677 | 0.162231 | 3.86739 | 0.16663 | **-0.2419539978** table 2 - compact #-run | (pre-PR) avg micros/op | std micros/op | (post-PR) micros/op | std micros/op | change (%) -- | -- | -- | -- | -- | -- 10 | 2,399,650.00 | 96,375.80 | 2,359,537.00 | 53,243.60 | -1.67 20 | 2,410,480.00 | 89,988.00 | 2,433,580.00 | 91,121.20 | 0.96 40 | 2.41E+06 | 121811 | 2.39E+06 | 131525 | **-0.96** 80 | 2.40E+06 | 134503 | 2.39E+06 | 108799 | **-0.78** - stress test: `python3 tools/db_crashtest.py blackbox --charge_file_metadata=1 --cache_size=1` killed as normal Reviewed By: ajkr Differential Revision: D36055583 Pulled By: hx235 fbshipit-source-id: b60eab94707103cb1322cf815f05810ef0232625
2022-06-14 20:06:40 +00:00
#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
class NonBatchedOpsStressTest : public StressTest {
public:
NonBatchedOpsStressTest() {}
virtual ~NonBatchedOpsStressTest() {}
void VerifyDb(ThreadState* thread) const override {
Add rate limiter priority to ReadOptions (#9424) Summary: Users can set the priority for file reads associated with their operation by setting `ReadOptions::rate_limiter_priority` to something other than `Env::IO_TOTAL`. Rate limiting `VerifyChecksum()` and `VerifyFileChecksums()` is the motivation for this PR, so it also includes benchmarks and minor bug fixes to get that working. `RandomAccessFileReader::Read()` already had support for rate limiting compaction reads. I changed that rate limiting to be non-specific to compaction, but rather performed according to the passed in `Env::IOPriority`. Now the compaction read rate limiting is supported by setting `rate_limiter_priority = Env::IO_LOW` on its `ReadOptions`. There is no default value for the new `Env::IOPriority` parameter to `RandomAccessFileReader::Read()`. That means this PR goes through all callers (in some cases multiple layers up the call stack) to find a `ReadOptions` to provide the priority. There are TODOs for cases I believe it would be good to let user control the priority some day (e.g., file footer reads), and no TODO in cases I believe it doesn't matter (e.g., trace file reads). The API doc only lists the missing cases where a file read associated with a provided `ReadOptions` cannot be rate limited. For cases like file ingestion checksum calculation, there is no API to provide `ReadOptions` or `Env::IOPriority`, so I didn't count that as missing. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9424 Test Plan: - new unit tests - new benchmarks on ~50MB database with 1MB/s read rate limit and 100ms refill interval; verified with strace reads are chunked (at 0.1MB per chunk) and spaced roughly 100ms apart. - setup command: `./db_bench -benchmarks=fillrandom,compact -db=/tmp/testdb -target_file_size_base=1048576 -disable_auto_compactions=true -file_checksum=true` - benchmarks command: `strace -ttfe pread64 ./db_bench -benchmarks=verifychecksum,verifyfilechecksums -use_existing_db=true -db=/tmp/testdb -rate_limiter_bytes_per_sec=1048576 -rate_limit_bg_reads=1 -rate_limit_user_ops=true -file_checksum=true` - crash test using IO_USER priority on non-validation reads with https://github.com/facebook/rocksdb/issues/9567 reverted: `python3 tools/db_crashtest.py blackbox --max_key=1000000 --write_buffer_size=524288 --target_file_size_base=524288 --level_compaction_dynamic_level_bytes=true --duration=3600 --rate_limit_bg_reads=true --rate_limit_user_ops=true --rate_limiter_bytes_per_sec=10485760 --interval=10` Reviewed By: hx235 Differential Revision: D33747386 Pulled By: ajkr fbshipit-source-id: a2d985e97912fba8c54763798e04f006ccc56e0c
2022-02-17 07:17:03 +00:00
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions options(FLAGS_verify_checksum, true);
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GetNowNanos();
ts = ts_str;
options.timestamp = &ts;
}
auto shared = thread->shared;
const int64_t max_key = shared->GetMaxKey();
const int64_t keys_per_thread = max_key / shared->GetNumThreads();
int64_t start = keys_per_thread * thread->tid;
int64_t end = start + keys_per_thread;
uint64_t prefix_to_use =
(FLAGS_prefix_size < 0) ? 1 : static_cast<size_t>(FLAGS_prefix_size);
if (thread->tid == shared->GetNumThreads() - 1) {
end = max_key;
}
for (size_t cf = 0; cf < column_families_.size(); ++cf) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
if (thread->rand.OneIn(4)) {
// 1/4 chance use iterator to verify this range
Slice prefix;
std::string seek_key = Key(start);
std::unique_ptr<Iterator> iter(
db_->NewIterator(options, column_families_[cf]));
iter->Seek(seek_key);
prefix = Slice(seek_key.data(), prefix_to_use);
for (auto i = start; i < end; i++) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
std::string from_db;
std::string keystr = Key(i);
Slice k = keystr;
Slice pfx = Slice(keystr.data(), prefix_to_use);
// Reseek when the prefix changes
if (prefix_to_use > 0 && prefix.compare(pfx) != 0) {
iter->Seek(k);
seek_key = keystr;
prefix = Slice(seek_key.data(), prefix_to_use);
}
Status s = iter->status();
if (iter->Valid()) {
Slice iter_key = iter->key();
if (iter->key().compare(k) > 0) {
s = Status::NotFound(Slice());
} else if (iter->key().compare(k) == 0) {
from_db = iter->value().ToString();
iter->Next();
} else if (iter_key.compare(k) < 0) {
VerificationAbort(shared, "An out of range key was found",
static_cast<int>(cf), i);
}
} else {
// The iterator found no value for the key in question, so do not
// move to the next item in the iterator
s = Status::NotFound();
}
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
s, true);
if (from_db.length()) {
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
from_db.data(), from_db.length());
}
}
} else if (thread->rand.OneIn(3)) {
// 1/4 chance use Get to verify this range
for (auto i = start; i < end; i++) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
std::string from_db;
std::string keystr = Key(i);
Slice k = keystr;
Status s = db_->Get(options, column_families_[cf], k, &from_db);
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
s, true);
if (from_db.length()) {
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
from_db.data(), from_db.length());
}
}
} else if (thread->rand.OneIn(2)) {
// 1/4 chance use MultiGet to verify this range
for (auto i = start; i < end;) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
// Keep the batch size to some reasonable value
size_t batch_size = thread->rand.Uniform(128) + 1;
batch_size = std::min<size_t>(batch_size, end - i);
std::vector<std::string> keystrs(batch_size);
std::vector<Slice> keys(batch_size);
std::vector<PinnableSlice> values(batch_size);
std::vector<Status> statuses(batch_size);
for (size_t j = 0; j < batch_size; ++j) {
keystrs[j] = Key(i + j);
keys[j] = Slice(keystrs[j].data(), keystrs[j].length());
}
db_->MultiGet(options, column_families_[cf], batch_size, keys.data(),
values.data(), statuses.data());
for (size_t j = 0; j < batch_size; ++j) {
Status s = statuses[j];
std::string from_db = values[j].ToString();
VerifyOrSyncValue(static_cast<int>(cf), i + j, options, shared,
from_db, s, true);
if (from_db.length()) {
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i + j),
from_db.data(), from_db.length());
}
}
i += batch_size;
}
} else {
// 1/4 chance use GetMergeOperand to verify this range
// Start off with small size that will be increased later if necessary
std::vector<PinnableSlice> values(4);
GetMergeOperandsOptions merge_operands_info;
merge_operands_info.expected_max_number_of_operands =
static_cast<int>(values.size());
for (auto i = start; i < end; i++) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
std::string from_db;
std::string keystr = Key(i);
Slice k = keystr;
int number_of_operands = 0;
Status s = db_->GetMergeOperands(options, column_families_[cf], k,
values.data(), &merge_operands_info,
&number_of_operands);
if (s.IsIncomplete()) {
// Need to resize values as there are more than values.size() merge
// operands on this key. Should only happen a few times when we
// encounter a key that had more merge operands than any key seen so
// far
values.resize(number_of_operands);
merge_operands_info.expected_max_number_of_operands =
static_cast<int>(number_of_operands);
s = db_->GetMergeOperands(options, column_families_[cf], k,
values.data(), &merge_operands_info,
&number_of_operands);
}
// Assumed here that GetMergeOperands always sets number_of_operand
if (number_of_operands) {
from_db = values[number_of_operands - 1].ToString();
}
VerifyOrSyncValue(static_cast<int>(cf), i, options, shared, from_db,
s, true);
if (from_db.length()) {
PrintKeyValue(static_cast<int>(cf), static_cast<uint32_t>(i),
from_db.data(), from_db.length());
}
}
}
}
}
#ifndef ROCKSDB_LITE
void ContinuouslyVerifyDb(ThreadState* thread) const override {
if (!cmp_db_) {
return;
}
assert(cmp_db_);
assert(!cmp_cfhs_.empty());
Status s = cmp_db_->TryCatchUpWithPrimary();
if (!s.ok()) {
assert(false);
exit(1);
}
const auto checksum_column_family = [](Iterator* iter,
uint32_t* checksum) -> Status {
assert(nullptr != checksum);
uint32_t ret = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
}
*checksum = ret;
return iter->status();
};
auto* shared = thread->shared;
assert(shared);
const int64_t max_key = shared->GetMaxKey();
ReadOptions read_opts(FLAGS_verify_checksum, true);
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GetNowNanos();
ts = ts_str;
read_opts.timestamp = &ts;
}
static Random64 rand64(shared->GetSeed());
{
uint32_t crc = 0;
std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
s = checksum_column_family(it.get(), &crc);
if (!s.ok()) {
fprintf(stderr, "Computing checksum of default cf: %s\n",
s.ToString().c_str());
assert(false);
}
}
for (auto* handle : cmp_cfhs_) {
if (thread->rand.OneInOpt(3)) {
// Use Get()
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
std::string value;
std::string key_ts;
s = cmp_db_->Get(read_opts, handle, key_str, &value,
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
s.PermitUncheckedError();
} else {
// Use range scan
std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts, handle));
uint32_t rnd = (thread->rand.Next()) % 4;
if (0 == rnd) {
// SeekToFirst() + Next()*5
read_opts.total_order_seek = true;
iter->SeekToFirst();
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
}
} else if (1 == rnd) {
// SeekToLast() + Prev()*5
read_opts.total_order_seek = true;
iter->SeekToLast();
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
}
} else if (2 == rnd) {
// Seek() +Next()*5
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
iter->Seek(key_str);
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
}
} else {
// SeekForPrev() + Prev()*5
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
iter->SeekForPrev(key_str);
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
}
}
}
}
}
#else
void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {}
#endif // ROCKSDB_LITE
void MaybeClearOneColumnFamily(ThreadState* thread) override {
if (FLAGS_column_families > 1) {
if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {
// drop column family and then create it again (can't drop default)
int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1;
std::string new_name =
std::to_string(new_column_family_name_.fetch_add(1));
{
MutexLock l(thread->shared->GetMutex());
fprintf(
stdout,
"[CF %d] Dropping and recreating column family. new name: %s\n",
cf, new_name.c_str());
}
thread->shared->LockColumnFamily(cf);
Status s = db_->DropColumnFamily(column_families_[cf]);
delete column_families_[cf];
if (!s.ok()) {
fprintf(stderr, "dropping column family error: %s\n",
s.ToString().c_str());
std::terminate();
}
s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name,
&column_families_[cf]);
column_family_names_[cf] = new_name;
thread->shared->ClearColumnFamily(cf);
if (!s.ok()) {
fprintf(stderr, "creating column family error: %s\n",
s.ToString().c_str());
std::terminate();
}
thread->shared->UnlockColumnFamily(cf);
}
}
}
bool ShouldAcquireMutexOnKey() const override { return true; }
bool IsStateTracked() const override { return true; }
Status TestGet(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
auto cfh = column_families_[rand_column_families[0]];
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
std::string from_db;
int error_count = 0;
if (fault_fs_guard) {
fault_fs_guard->EnableErrorInjection();
SharedState::ignore_read_error = false;
}
ReadOptions read_opts_copy = read_opts;
std::string read_ts_str;
Slice read_ts_slice;
MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice,
read_opts_copy);
Status s = db_->Get(read_opts_copy, cfh, key, &from_db);
if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount();
}
if (s.ok()) {
if (fault_fs_guard) {
if (error_count && !SharedState::ignore_read_error) {
// Grab mutex so multiple thread don't try to print the
// stack trace at the same time
MutexLock l(thread->shared->GetMutex());
fprintf(stderr, "Didn't get expected error from Get\n");
fprintf(stderr, "Callstack that injected the fault\n");
fault_fs_guard->PrintFaultBacktrace();
std::terminate();
}
}
// found case
thread->stats.AddGets(1, 1);
// we only have the latest expected state
Add Iterator test against expected state to stress test (#10538) Summary: As mentioned in https://github.com/facebook/rocksdb/pull/5506#issuecomment-506021913, `db_stress` does not have much verification for iterator correctness. It has a `TestIterate()` function, but that is mainly for comparing results between two iterators, one with `total_order_seek` and the other optionally sets auto_prefix, upper/lower bounds. Commit 49a0581ad2462e31aa3f768afa769e0d33390f33 added a new `TestIterateAgainstExpected()` function that compares iterator against expected state. It locks a range of keys, creates an iterator, does a random sequence of `Next/Prev` and compares against expected state. This PR is based on that commit, the main changes include some logs (for easier debugging if a test fails), a forward and backward scan to cover the entire locked key range, and a flag for optionally turning on this version of Iterator testing. Added constraint that the checks against expected state in `TestIterateAgainstExpected()` and in `TestGet()` are only turned on when `--skip_verifydb` flag is not set. Remove the change log introduced in https://github.com/facebook/rocksdb/issues/10553. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10538 Test Plan: Run `db_stress` with `--verify_iterator_with_expected_state_one_in=1`, and a large `--iterpercent` and `--num_iterations`. Checked `op_logs` manually to ensure expected coverage. Tweaked part of the code in https://github.com/facebook/rocksdb/issues/10449 and stress test was able to catch it. - internally run various flavor of crash test Reviewed By: ajkr Differential Revision: D38847269 Pulled By: cbi42 fbshipit-source-id: 8b4402a9bba9f6cfa08051943cd672579d489599
2022-08-24 21:59:50 +00:00
if (!FLAGS_skip_verifydb && !read_opts_copy.timestamp &&
thread->shared->Get(rand_column_families[0], rand_keys[0]) ==
SharedState::DELETION_SENTINEL) {
thread->shared->SetVerificationFailure();
fprintf(stderr,
"error : inconsistent values for key %s: Get returns %s, "
"expected state does not have the key.\n",
key.ToString(true).c_str(), StringToHex(from_db).c_str());
}
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
Add Iterator test against expected state to stress test (#10538) Summary: As mentioned in https://github.com/facebook/rocksdb/pull/5506#issuecomment-506021913, `db_stress` does not have much verification for iterator correctness. It has a `TestIterate()` function, but that is mainly for comparing results between two iterators, one with `total_order_seek` and the other optionally sets auto_prefix, upper/lower bounds. Commit 49a0581ad2462e31aa3f768afa769e0d33390f33 added a new `TestIterateAgainstExpected()` function that compares iterator against expected state. It locks a range of keys, creates an iterator, does a random sequence of `Next/Prev` and compares against expected state. This PR is based on that commit, the main changes include some logs (for easier debugging if a test fails), a forward and backward scan to cover the entire locked key range, and a flag for optionally turning on this version of Iterator testing. Added constraint that the checks against expected state in `TestIterateAgainstExpected()` and in `TestGet()` are only turned on when `--skip_verifydb` flag is not set. Remove the change log introduced in https://github.com/facebook/rocksdb/issues/10553. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10538 Test Plan: Run `db_stress` with `--verify_iterator_with_expected_state_one_in=1`, and a large `--iterpercent` and `--num_iterations`. Checked `op_logs` manually to ensure expected coverage. Tweaked part of the code in https://github.com/facebook/rocksdb/issues/10449 and stress test was able to catch it. - internally run various flavor of crash test Reviewed By: ajkr Differential Revision: D38847269 Pulled By: cbi42 fbshipit-source-id: 8b4402a9bba9f6cfa08051943cd672579d489599
2022-08-24 21:59:50 +00:00
if (!FLAGS_skip_verifydb && !read_opts_copy.timestamp) {
auto expected =
thread->shared->Get(rand_column_families[0], rand_keys[0]);
if (expected != SharedState::DELETION_SENTINEL &&
expected != SharedState::UNKNOWN_SENTINEL) {
thread->shared->SetVerificationFailure();
fprintf(stderr,
"error : inconsistent values for key %s: expected state has "
"the key, Get() returns NotFound.\n",
key.ToString(true).c_str());
}
}
} else {
if (error_count == 0) {
// errors case
thread->stats.AddErrors(1);
} else {
thread->stats.AddVerifiedErrors(1);
}
}
if (fault_fs_guard) {
fault_fs_guard->DisableErrorInjection();
}
return s;
}
std::vector<Status> TestMultiGet(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
size_t num_keys = rand_keys.size();
std::vector<std::string> key_str;
std::vector<Slice> keys;
key_str.reserve(num_keys);
keys.reserve(num_keys);
std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
int error_count = 0;
// Do a consistency check between Get and MultiGet. Don't do it too
// often as it will slow db_stress down
bool do_consistency_check = thread->rand.OneIn(4);
ReadOptions readoptionscopy = read_opts;
if (do_consistency_check) {
readoptionscopy.snapshot = db_->GetSnapshot();
}
std::string read_ts_str;
Slice read_ts_slice;
MaybeUseOlderTimestampForPointLookup(thread, read_ts_str, read_ts_slice,
readoptionscopy);
Add rate-limiting support to batched MultiGet() (#10159) Summary: **Context/Summary:** https://github.com/facebook/rocksdb/pull/9424 added rate-limiting support for user reads, which does not include batched `MultiGet()`s that call `RandomAccessFileReader::MultiRead()`. The reason is that it's harder (compared with RandomAccessFileReader::Read()) to implement the ideal rate-limiting where we first call `RateLimiter::RequestToken()` for allowed bytes to multi-read and then consume those bytes by satisfying as many requests in `MultiRead()` as possible. For example, it can be tricky to decide whether we want partially fulfilled requests within one `MultiRead()` or not. However, due to a recent urgent user request, we decide to pursue an elementary (but a conditionally ineffective) solution where we accumulate enough rate limiter requests toward the total bytes needed by one `MultiRead()` before doing that `MultiRead()`. This is not ideal when the total bytes are huge as we will actually consume a huge bandwidth from rate-limiter causing a burst on disk. This is not what we ultimately want with rate limiter. Therefore a follow-up work is noted through TODO comments. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10159 Test Plan: - Modified existing unit test `DBRateLimiterOnReadTest/DBRateLimiterOnReadTest.NewMultiGet` - Traced the underlying system calls `io_uring_enter` and verified they are 10 seconds apart from each other correctly under the setting of `strace -ftt -e trace=io_uring_enter ./db_bench -benchmarks=multireadrandom -db=/dev/shm/testdb2 -readonly -num=50 -threads=1 -multiread_batched=1 -batch_size=100 -duration=10 -rate_limiter_bytes_per_sec=200 -rate_limiter_refill_period_us=1000000 -rate_limit_bg_reads=1 -disable_auto_compactions=1 -rate_limit_user_ops=1` where each `MultiRead()` read about 2000 bytes (inspected by debugger) and the rate limiter grants 200 bytes per seconds. - Stress test: - Verified `./db_stress (-test_cf_consistency=1/test_batches_snapshots=1) -use_multiget=1 -cache_size=1048576 -rate_limiter_bytes_per_sec=10241024 -rate_limit_bg_reads=1 -rate_limit_user_ops=1` work Reviewed By: ajkr, anand1976 Differential Revision: D37135172 Pulled By: hx235 fbshipit-source-id: 73b8e8f14761e5d4b77235dfe5d41f4eea968bcd
2022-06-17 23:40:47 +00:00
readoptionscopy.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
// To appease clang analyzer
const bool use_txn = FLAGS_use_txn;
// Create a transaction in order to write some data. The purpose is to
// exercise WriteBatchWithIndex::MultiGetFromBatchAndDB. The transaction
// will be rolled back once MultiGet returns.
#ifndef ROCKSDB_LITE
Transaction* txn = nullptr;
if (use_txn) {
WriteOptions wo;
Rate-limit automatic WAL flush after each user write (#9607) Summary: **Context:** WAL flush is currently not rate-limited by `Options::rate_limiter`. This PR is to provide rate-limiting to auto WAL flush, the one that automatically happen after each user write operation (i.e, `Options::manual_wal_flush == false`), by adding `WriteOptions::rate_limiter_options`. Note that we are NOT rate-limiting WAL flush that do NOT automatically happen after each user write, such as `Options::manual_wal_flush == true + manual FlushWAL()` (rate-limiting multiple WAL flushes), for the benefits of: - being consistent with [ReadOptions::rate_limiter_priority](https://github.com/facebook/rocksdb/blob/7.0.fb/include/rocksdb/options.h#L515) - being able to turn off some WAL flush's rate-limiting but not all (e.g, turn off specific the WAL flush of a critical user write like a service's heartbeat) `WriteOptions::rate_limiter_options` only accept `Env::IO_USER` and `Env::IO_TOTAL` currently due to an implementation constraint. - The constraint is that we currently queue parallel writes (including WAL writes) based on FIFO policy which does not factor rate limiter priority into this layer's scheduling. If we allow lower priorities such as `Env::IO_HIGH/MID/LOW` and such writes specified with lower priorities occurs before ones specified with higher priorities (even just by a tiny bit in arrival time), the former would have blocked the latter, leading to a "priority inversion" issue and contradictory to what we promise for rate-limiting priority. Therefore we only allow `Env::IO_USER` and `Env::IO_TOTAL` right now before improving that scheduling. A pre-requisite to this feature is to support operation-level rate limiting in `WritableFileWriter`, which is also included in this PR. **Summary:** - Renamed test suite `DBRateLimiterTest to DBRateLimiterOnReadTest` for adding a new test suite - Accept `rate_limiter_priority` in `WritableFileWriter`'s private and public write functions - Passed `WriteOptions::rate_limiter_options` to `WritableFileWriter` in the path of automatic WAL flush. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9607 Test Plan: - Added new unit test to verify existing flush/compaction rate-limiting does not break, since `DBTest, RateLimitingTest` is disabled and current db-level rate-limiting tests focus on read only (e.g, `db_rate_limiter_test`, `DBTest2, RateLimitedCompactionReads`). - Added new unit test `DBRateLimiterOnWriteWALTest, AutoWalFlush` - `strace -ftt -e trace=write ./db_bench -benchmarks=fillseq -db=/dev/shm/testdb -rate_limit_auto_wal_flush=1 -rate_limiter_bytes_per_sec=15 -rate_limiter_refill_period_us=1000000 -write_buffer_size=100000000 -disable_auto_compactions=1 -num=100` - verified that WAL flush(i.e, system-call _write_) were chunked into 15 bytes and each _write_ was roughly 1 second apart - verified the chunking disappeared when `-rate_limit_auto_wal_flush=0` - crash test: `python3 tools/db_crashtest.py blackbox --disable_wal=0 --rate_limit_auto_wal_flush=1 --rate_limiter_bytes_per_sec=10485760 --interval=10` killed as normal **Benchmarked on flush/compaction to ensure no performance regression:** - compaction with rate-limiting (see table 1, avg over 1280-run): pre-change: **915635 micros/op**; post-change: **907350 micros/op (improved by 0.106%)** ``` #!/bin/bash TEST_TMPDIR=/dev/shm/testdb START=1 NUM_DATA_ENTRY=8 N=10 rm -f compact_bmk_output.txt compact_bmk_output_2.txt dont_care_output.txt for i in $(eval echo "{$START..$NUM_DATA_ENTRY}") do NUM_RUN=$(($N*(2**($i-1)))) for j in $(eval echo "{$START..$NUM_RUN}") do ./db_bench --benchmarks=fillrandom -db=$TEST_TMPDIR -disable_auto_compactions=1 -write_buffer_size=6710886 > dont_care_output.txt && ./db_bench --benchmarks=compact -use_existing_db=1 -db=$TEST_TMPDIR -level0_file_num_compaction_trigger=1 -rate_limiter_bytes_per_sec=100000000 | egrep 'compact' done > compact_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' compact_bmk_output.txt >> compact_bmk_output_2.txt done ``` - compaction w/o rate-limiting (see table 2, avg over 640-run): pre-change: **822197 micros/op**; post-change: **823148 micros/op (regressed by 0.12%)** ``` Same as above script, except that -rate_limiter_bytes_per_sec=0 ``` - flush with rate-limiting (see table 3, avg over 320-run, run on the [patch](https://github.com/hx235/rocksdb/commit/ee5c6023a9f6533fab9afdc681568daa21da4953) to augment current db_bench ): pre-change: **745752 micros/op**; post-change: **745331 micros/op (regressed by 0.06 %)** ``` #!/bin/bash TEST_TMPDIR=/dev/shm/testdb START=1 NUM_DATA_ENTRY=8 N=10 rm -f flush_bmk_output.txt flush_bmk_output_2.txt for i in $(eval echo "{$START..$NUM_DATA_ENTRY}") do NUM_RUN=$(($N*(2**($i-1)))) for j in $(eval echo "{$START..$NUM_RUN}") do ./db_bench -db=$TEST_TMPDIR -write_buffer_size=1048576000 -num=1000000 -rate_limiter_bytes_per_sec=100000000 -benchmarks=fillseq,flush | egrep 'flush' done > flush_bmk_output.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' flush_bmk_output.txt >> flush_bmk_output_2.txt done ``` - flush w/o rate-limiting (see table 4, avg over 320-run, run on the [patch](https://github.com/hx235/rocksdb/commit/ee5c6023a9f6533fab9afdc681568daa21da4953) to augment current db_bench): pre-change: **487512 micros/op**, post-change: **485856 micors/ops (improved by 0.34%)** ``` Same as above script, except that -rate_limiter_bytes_per_sec=0 ``` | table 1 - compact with rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 896978 | 16046.9 | 901242 | 15670.9 | 0.475373978 20 | 893718 | 15813 | 886505 | 17544.7 | -0.8070778478 40 | 900426 | 23882.2 | 894958 | 15104.5 | -0.6072681153 80 | 906635 | 21761.5 | 903332 | 23948.3 | -0.3643141948 160 | 898632 | 21098.9 | 907583 | 21145 | 0.9960695813 3.20E+02 | 905252 | 22785.5 | 908106 | 25325.5 | 0.3152713278 6.40E+02 | 905213 | 23598.6 | 906741 | 21370.5 | 0.1688000504 **1.28E+03** | **908316** | **23533.1** | **907350** | **24626.8** | **-0.1063506533** average over #-run | 901896.25 | 21064.9625 | 901977.125 | 20592.025 | 0.008967217682 | table 2 - compact w/o rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 811211 | 26996.7 | 807586 | 28456.4 | -0.4468627768 20 | 815465 | 14803.7 | 814608 | 28719.7 | -0.105093413 40 | 809203 | 26187.1 | 797835 | 25492.1 | -1.404839082 80 | 822088 | 28765.3 | 822192 | 32840.4 | 0.01265071379 160 | 821719 | 36344.7 | 821664 | 29544.9 | -0.006693285661 3.20E+02 | 820921 | 27756.4 | 821403 | 28347.7 | 0.05871454135 **6.40E+02** | **822197** | **28960.6** | **823148** | **30055.1** | **0.1156657103** average over #-run | 8.18E+05 | 2.71E+04 | 8.15E+05 | 2.91E+04 | -0.25 | table 3 - flush with rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 741721 | 11770.8 | 740345 | 5949.76 | -0.1855144994 20 | 735169 | 3561.83 | 743199 | 9755.77 | 1.09226586 40 | 743368 | 8891.03 | 742102 | 8683.22 | -0.1703059588 80 | 742129 | 8148.51 | 743417 | 9631.58| 0.1735547324 160 | 749045 | 9757.21 | 746256 | 9191.86 | -0.3723407806 **3.20E+02** | **745752** | **9819.65** | **745331** | **9840.62** | **-0.0564530836** 6.40E+02 | 749006 | 11080.5 | 748173 | 10578.7 | -0.1112140624 average over #-run | 743741.4286 | 9004.218571 | 744117.5714 | 9090.215714 | 0.05057441238 | table 4 - flush w/o rate-limiting| #-run | (pre-change) avg micros/op | std micros/op | (post-change) avg micros/op | std micros/op | change in avg micros/op (%) -- | -- | -- | -- | -- | -- 10 | 477283 | 24719.6 | 473864 | 12379 | -0.7163464863 20 | 486743 | 20175.2 | 502296 | 23931.3 | 3.195320734 40 | 482846 | 15309.2 | 489820 | 22259.5 | 1.444352858 80 | 491490 | 21883.1 | 490071 | 23085.7 | -0.2887139108 160 | 493347 | 28074.3 | 483609 | 21211.7 | -1.973864238 **3.20E+02** | **487512** | **21401.5** | **485856** | **22195.2** | **-0.3396839462** 6.40E+02 | 490307 | 25418.6 | 485435 | 22405.2 | -0.9936631539 average over #-run | 4.87E+05 | 2.24E+04 | 4.87E+05 | 2.11E+04 | 0.00E+00 Reviewed By: ajkr Differential Revision: D34442441 Pulled By: hx235 fbshipit-source-id: 4790f13e1e5c0a95ae1d1cc93ffcf69dc6e78bdd
2022-03-08 21:19:39 +00:00
if (FLAGS_rate_limit_auto_wal_flush) {
wo.rate_limiter_priority = Env::IO_USER;
}
Status s = NewTxn(wo, &txn);
if (!s.ok()) {
fprintf(stderr, "NewTxn: %s\n", s.ToString().c_str());
std::terminate();
}
}
#endif
for (size_t i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back());
#ifndef ROCKSDB_LITE
if (use_txn) {
// With a 1 in 10 probability, insert the just added key in the batch
// into the transaction. This will create an overlap with the MultiGet
// keys and exercise some corner cases in the code
if (thread->rand.OneIn(10)) {
int op = thread->rand.Uniform(2);
Status s;
switch (op) {
case 0:
case 1: {
uint32_t value_base =
thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
char value[100];
size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz);
if (op == 0) {
s = txn->Put(cfh, keys.back(), v);
} else {
s = txn->Merge(cfh, keys.back(), v);
}
break;
}
case 2:
s = txn->Delete(cfh, keys.back());
break;
default:
assert(false);
}
if (!s.ok()) {
fprintf(stderr, "Transaction put: %s\n", s.ToString().c_str());
std::terminate();
}
}
}
#endif
}
if (!use_txn) {
if (fault_fs_guard) {
fault_fs_guard->EnableErrorInjection();
SharedState::ignore_read_error = false;
}
db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
statuses.data());
if (fault_fs_guard) {
error_count = fault_fs_guard->GetAndResetErrorCount();
}
} else {
#ifndef ROCKSDB_LITE
txn->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
statuses.data());
#endif
}
if (fault_fs_guard && error_count && !SharedState::ignore_read_error) {
int stat_nok = 0;
for (const auto& s : statuses) {
if (!s.ok() && !s.IsNotFound()) {
stat_nok++;
}
}
if (stat_nok < error_count) {
// Grab mutex so multiple thread don't try to print the
// stack trace at the same time
MutexLock l(thread->shared->GetMutex());
fprintf(stderr, "Didn't get expected error from MultiGet. \n");
fprintf(stderr, "num_keys %zu Expected %d errors, seen %d\n", num_keys,
error_count, stat_nok);
fprintf(stderr, "Callstack that injected the fault\n");
fault_fs_guard->PrintFaultBacktrace();
std::terminate();
}
}
if (fault_fs_guard) {
fault_fs_guard->DisableErrorInjection();
}
for (size_t i = 0; i < statuses.size(); ++i) {
Status s = statuses[i];
bool is_consistent = true;
// Only do the consistency check if no error was injected and MultiGet
// didn't return an unexpected error
if (do_consistency_check && !error_count && (s.ok() || s.IsNotFound())) {
Status tmp_s;
std::string value;
if (use_txn) {
#ifndef ROCKSDB_LITE
tmp_s = txn->Get(readoptionscopy, cfh, keys[i], &value);
#endif // ROCKSDB_LITE
} else {
tmp_s = db_->Get(readoptionscopy, cfh, keys[i], &value);
}
if (!tmp_s.ok() && !tmp_s.IsNotFound()) {
fprintf(stderr, "Get error: %s\n", s.ToString().c_str());
is_consistent = false;
} else if (!s.ok() && tmp_s.ok()) {
fprintf(stderr, "MultiGet returned different results with key %s\n",
keys[i].ToString(true).c_str());
fprintf(stderr, "Get returned ok, MultiGet returned not found\n");
is_consistent = false;
} else if (s.ok() && tmp_s.IsNotFound()) {
fprintf(stderr, "MultiGet returned different results with key %s\n",
keys[i].ToString(true).c_str());
fprintf(stderr, "MultiGet returned ok, Get returned not found\n");
is_consistent = false;
} else if (s.ok() && value != values[i].ToString()) {
fprintf(stderr, "MultiGet returned different results with key %s\n",
keys[i].ToString(true).c_str());
fprintf(stderr, "MultiGet returned value %s\n",
values[i].ToString(true).c_str());
fprintf(stderr, "Get returned value %s\n", value.c_str());
is_consistent = false;
}
}
if (!is_consistent) {
fprintf(stderr, "TestMultiGet error: is_consistent is false\n");
thread->stats.AddErrors(1);
// Fail fast to preserve the DB state
thread->shared->SetVerificationFailure();
break;
} else if (s.ok()) {
// found case
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
// not found case
thread->stats.AddGets(1, 0);
} else if (s.IsMergeInProgress() && use_txn) {
// With txn this is sometimes expected.
thread->stats.AddGets(1, 1);
} else {
if (error_count == 0) {
// errors case
fprintf(stderr, "MultiGet error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddVerifiedErrors(1);
}
}
}
if (readoptionscopy.snapshot) {
db_->ReleaseSnapshot(readoptionscopy.snapshot);
}
fix transaction rollback in db_stress TestMultiGet (#6873) Summary: There were further uses of `txn` after `RollbackTxn(txn)` leading to stress test errors. Moved the rollback to the end of the function. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6873 Test Plan: found a command from the crash test that previously failed immediately under TSAN; verified now it succeeds. ``` ./db_stress --acquire_snapshot_one_in=10000 --allow_concurrent_memtable_write=1 --avoid_flush_during_recovery=0 --avoid_unnecessary_blocking_io=1 --block_size=16384 --bloom_bits=222.913637674 --bottommost_compression_type=none --cache_index_and_filter_blocks=1 --cache_size=1048576 --checkpoint_one_in=0 --checksum_type=kCRC32c --clear_column_family_one_in=0 --compact_files_one_in=1000000 --compact_range_one_in=1000000 --compaction_style=1 --compaction_ttl=0 --compression_max_dict_bytes=0 --compression_parallel_threads=1 --compression_type=zstd --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --db=/dev/shm/rocksdb/rocksdb_crashtest_whitebox --db_write_buffer_size=1048576 --delpercent=5 --delrangepercent=0 --destroy_db_initially=0 --disable_wal=0 --enable_pipelined_write=0 --flush_one_in=1000000 --format_version=5 --get_current_wal_file_one_in=0 --get_live_files_one_in=1000000 --get_sorted_wal_files_one_in=0 --index_block_restart_interval=12 --index_type=2 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True --log2_keys_per_lock=22 --long_running_snapshots=0 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=1000000 --max_key_len=3 --max_manifest_file_size=1073741824 --max_write_batch_group_size_bytes=1048576 --max_write_buffer_number=3 --memtablerep=skip_list --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --num_levels=1 --open_files=100 --ops_per_thread=200000 --partition_filters=1 --pause_background_one_in=1000000 --periodic_compaction_seconds=0 --prefixpercent=5 --progress_reports=0 --read_fault_one_in=0 --readpercent=45 --recycle_log_file_num=0 --reopen=20 --snapshot_hold_ops=100000 --subcompactions=4 --sync=0 --sync_fault_injection=False --target_file_size_base=2097152 --target_file_size_multiplier=2 --test_batches_snapshots=0 --txn_write_policy=1 --unordered_write=1 --use_block_based_filter=0 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=1 --use_merge=1 --use_multiget=1 --use_txn=1 --verify_checksum=1 --verify_checksum_one_in=1000000 --verify_db_one_in=100000 --write_buffer_size=4194304 --write_dbid_to_manifest=0 --writepercent=35 ``` Reviewed By: pdillinger Differential Revision: D21708338 Pulled By: ajkr fbshipit-source-id: dcf55cddee0a14f429a75e7a8a505acf8025f2b1
2020-05-24 22:25:43 +00:00
if (use_txn) {
#ifndef ROCKSDB_LITE
RollbackTxn(txn);
#endif
}
return statuses;
}
Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
auto cfh = column_families_[rand_column_families[0]];
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
Slice prefix = Slice(key.data(), FLAGS_prefix_size);
std::string upper_bound;
Slice ub_slice;
ReadOptions ro_copy = read_opts;
// Get the next prefix first and then see if we want to set upper bound.
// We'll use the next prefix in an assertion later on
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
// For half of the time, set the upper bound to the next prefix
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
}
std::string read_ts_str;
Slice read_ts_slice;
MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice,
ro_copy);
Iterator* iter = db_->NewIterator(ro_copy, cfh);
unsigned long count = 0;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
++count;
}
if (ro_copy.iter_start_ts == nullptr) {
assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
}
Status s = iter->status();
if (iter->status().ok()) {
thread->stats.AddPrefixes(1, count);
} else {
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
}
delete iter;
return s;
}
Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys, char (&value)[100],
std::unique_ptr<MutexLock>& lock) override {
auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey();
int64_t rand_key = rand_keys[0];
int rand_column_family = rand_column_families[0];
std::string write_ts_str;
Slice write_ts;
while (!shared->AllowsOverwrite(rand_key) &&
(FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
lock.reset();
rand_key = thread->rand.Next() % max_key;
rand_column_family = thread->rand.Next() % FLAGS_column_families;
lock.reset(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
if (FLAGS_user_timestamp_size > 0) {
write_ts_str = GetNowNanos();
write_ts = write_ts_str;
}
}
Revise APIs related to user-defined timestamp (#8946) Summary: ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`. Namely, `WriteOptions` should not include information about "what-to-write", but should just include information about "how-to-write". According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore, this PR removes `WriteOptions::timestamp` for compliance. After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and `SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity made me reconsider doing it in another PR (maybe). For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take extra `timestamp` information when writing to `WriteBatch`es. These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list. Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to `WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps allocated already and multiple timestamps can be updated. The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp size of the default column family. This will be used to allocate space when calling APIs that do not specify a column family handle. Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing some assertions about timestamp to returning Status code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946 Test Plan: make check ./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8 ./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0 Make sure there is no perf regression by running the following ``` ./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom ``` Before this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s ``` After this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s ``` Reviewed By: ltamasi Differential Revision: D33721359 Pulled By: riversand963 fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 06:17:46 +00:00
if (write_ts.size() == 0 && FLAGS_user_timestamp_size) {
write_ts_str = GetNowNanos();
Revise APIs related to user-defined timestamp (#8946) Summary: ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`. Namely, `WriteOptions` should not include information about "what-to-write", but should just include information about "how-to-write". According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore, this PR removes `WriteOptions::timestamp` for compliance. After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and `SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity made me reconsider doing it in another PR (maybe). For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take extra `timestamp` information when writing to `WriteBatch`es. These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list. Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to `WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps allocated already and multiple timestamps can be updated. The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp size of the default column family. This will be used to allocate space when calling APIs that do not specify a column family handle. Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing some assertions about timestamp to returning Status code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946 Test Plan: make check ./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8 ./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0 Make sure there is no perf regression by running the following ``` ./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom ``` Before this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s ``` After this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s ``` Reviewed By: ltamasi Differential Revision: D33721359 Pulled By: riversand963 fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 06:17:46 +00:00
write_ts = write_ts_str;
}
std::string key_str = Key(rand_key);
Slice key = key_str;
ColumnFamilyHandle* cfh = column_families_[rand_column_family];
if (FLAGS_verify_before_write) {
std::string key_str2 = Key(rand_key);
Slice k = key_str2;
std::string from_db;
Status s = db_->Get(read_opts, cfh, k, &from_db);
if (!VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared,
from_db, s, true)) {
return s;
}
}
uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz);
shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
Status s;
if (FLAGS_use_merge) {
if (!FLAGS_use_txn) {
s = db_->Merge(write_opts, cfh, key, v);
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Merge(cfh, key, v);
if (s.ok()) {
Snapshots with user-specified timestamps (#9879) Summary: In RocksDB, keys are associated with (internal) sequence numbers which denote when the keys are written to the database. Sequence numbers in different RocksDB instances are unrelated, thus not comparable. It is nice if we can associate sequence numbers with their corresponding actual timestamps. One thing we can do is to support user-defined timestamp, which allows the applications to specify the format of custom timestamps and encode a timestamp with each key. More details can be found at https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp-%28Experimental%29. This PR provides a different but complementary approach. We can associate rocksdb snapshots (defined in https://github.com/facebook/rocksdb/blob/7.2.fb/include/rocksdb/snapshot.h#L20) with **user-specified** timestamps. Since a snapshot is essentially an object representing a sequence number, this PR establishes a bi-directional mapping between sequence numbers and timestamps. In the past, snapshots are usually taken by readers. The current super-version is grabbed, and a `rocksdb::Snapshot` object is created with the last published sequence number of the super-version. You can see that the reader actually has no good idea of what timestamp to assign to this snapshot, because by the time the `GetSnapshot()` is called, an arbitrarily long period of time may have already elapsed since the last write, which is when the last published sequence number is written. This observation motivates the creation of "timestamped" snapshots on the write path. Currently, this functionality is exposed only to the layer of `TransactionDB`. Application can tell RocksDB to create a snapshot when a transaction commits, effectively associating the last sequence number with a timestamp. It is also assumed that application will ensure any two snapshots with timestamps should satisfy the following: ``` snapshot1.seq < snapshot2.seq iff. snapshot1.ts < snapshot2.ts ``` If the application can guarantee that when a reader takes a timestamped snapshot, there is no active writes going on in the database, then we also allow the user to use a new API `TransactionDB::CreateTimestampedSnapshot()` to create a snapshot with associated timestamp. Code example ```cpp // Create a timestamped snapshot when committing transaction. txn->SetCommitTimestamp(100); txn->SetSnapshotOnNextOperation(); txn->Commit(); // A wrapper API for convenience Status Transaction::CommitAndTryCreateSnapshot( std::shared_ptr<TransactionNotifier> notifier, TxnTimestamp ts, std::shared_ptr<const Snapshot>* ret); // Create a timestamped snapshot if caller guarantees no concurrent writes std::pair<Status, std::shared_ptr<const Snapshot>> snapshot = txn_db->CreateTimestampedSnapshot(100); ``` The snapshots created in this way will be managed by RocksDB with ref-counting and potentially shared with other readers. We provide the following APIs for readers to retrieve a snapshot given a timestamp. ```cpp // Return the timestamped snapshot correponding to given timestamp. If ts is // kMaxTxnTimestamp, then we return the latest timestamped snapshot if present. // Othersise, we return the snapshot whose timestamp is equal to `ts`. If no // such snapshot exists, then we return null. std::shared_ptr<const Snapshot> TransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const; // Return the latest timestamped snapshot if present. std::shared_ptr<const Snapshot> TransactionDB::GetLatestTimestampedSnapshot() const; ``` We also provide two additional APIs for stats collection and reporting purposes. ```cpp Status TransactionDB::GetAllTimestampedSnapshots( std::vector<std::shared_ptr<const Snapshot>>& snapshots) const; // Return timestamped snapshots whose timestamps fall in [ts_lb, ts_ub) and store them in `snapshots`. Status TransactionDB::GetTimestampedSnapshots( TxnTimestamp ts_lb, TxnTimestamp ts_ub, std::vector<std::shared_ptr<const Snapshot>>& snapshots) const; ``` To prevent the number of timestamped snapshots from growing infinitely, we provide the following API to release timestamped snapshots whose timestamps are older than or equal to a given threshold. ```cpp void TransactionDB::ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts); ``` Before shutdown, RocksDB will release all timestamped snapshots. Comparison with user-defined timestamp and how they can be combined: User-defined timestamp persists every key with a timestamp, while timestamped snapshots maintain a volatile mapping between snapshots (sequence numbers) and timestamps. Different internal keys with the same user key but different timestamps will be treated as different by compaction, thus a newer version will not hide older versions (with smaller timestamps) unless they are eligible for garbage collection. In contrast, taking a timestamped snapshot at a certain sequence number and timestamp prevents all the keys visible in this snapshot from been dropped by compaction. Here, visible means (seq < snapshot and most recent). The timestamped snapshot supports the semantics of reading at an exact point in time. Timestamped snapshots can also be used with user-defined timestamp. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9879 Test Plan: ``` make check TEST_TMPDIR=/dev/shm make crash_test_with_txn ``` Reviewed By: siying Differential Revision: D35783919 Pulled By: riversand963 fbshipit-source-id: 586ad905e169189e19d3bfc0cb0177a7239d1bd4
2022-06-10 23:07:03 +00:00
s = CommitTxn(txn, thread);
}
}
#endif
}
} else {
if (!FLAGS_use_txn) {
Revise APIs related to user-defined timestamp (#8946) Summary: ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`. Namely, `WriteOptions` should not include information about "what-to-write", but should just include information about "how-to-write". According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore, this PR removes `WriteOptions::timestamp` for compliance. After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and `SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity made me reconsider doing it in another PR (maybe). For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take extra `timestamp` information when writing to `WriteBatch`es. These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list. Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to `WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps allocated already and multiple timestamps can be updated. The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp size of the default column family. This will be used to allocate space when calling APIs that do not specify a column family handle. Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing some assertions about timestamp to returning Status code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946 Test Plan: make check ./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8 ./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0 Make sure there is no perf regression by running the following ``` ./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom ``` Before this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s ``` After this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s ``` Reviewed By: ltamasi Differential Revision: D33721359 Pulled By: riversand963 fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 06:17:46 +00:00
if (FLAGS_user_timestamp_size == 0) {
s = db_->Put(write_opts, cfh, key, v);
} else {
s = db_->Put(write_opts, cfh, key, write_ts, v);
}
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Put(cfh, key, v);
if (s.ok()) {
Snapshots with user-specified timestamps (#9879) Summary: In RocksDB, keys are associated with (internal) sequence numbers which denote when the keys are written to the database. Sequence numbers in different RocksDB instances are unrelated, thus not comparable. It is nice if we can associate sequence numbers with their corresponding actual timestamps. One thing we can do is to support user-defined timestamp, which allows the applications to specify the format of custom timestamps and encode a timestamp with each key. More details can be found at https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp-%28Experimental%29. This PR provides a different but complementary approach. We can associate rocksdb snapshots (defined in https://github.com/facebook/rocksdb/blob/7.2.fb/include/rocksdb/snapshot.h#L20) with **user-specified** timestamps. Since a snapshot is essentially an object representing a sequence number, this PR establishes a bi-directional mapping between sequence numbers and timestamps. In the past, snapshots are usually taken by readers. The current super-version is grabbed, and a `rocksdb::Snapshot` object is created with the last published sequence number of the super-version. You can see that the reader actually has no good idea of what timestamp to assign to this snapshot, because by the time the `GetSnapshot()` is called, an arbitrarily long period of time may have already elapsed since the last write, which is when the last published sequence number is written. This observation motivates the creation of "timestamped" snapshots on the write path. Currently, this functionality is exposed only to the layer of `TransactionDB`. Application can tell RocksDB to create a snapshot when a transaction commits, effectively associating the last sequence number with a timestamp. It is also assumed that application will ensure any two snapshots with timestamps should satisfy the following: ``` snapshot1.seq < snapshot2.seq iff. snapshot1.ts < snapshot2.ts ``` If the application can guarantee that when a reader takes a timestamped snapshot, there is no active writes going on in the database, then we also allow the user to use a new API `TransactionDB::CreateTimestampedSnapshot()` to create a snapshot with associated timestamp. Code example ```cpp // Create a timestamped snapshot when committing transaction. txn->SetCommitTimestamp(100); txn->SetSnapshotOnNextOperation(); txn->Commit(); // A wrapper API for convenience Status Transaction::CommitAndTryCreateSnapshot( std::shared_ptr<TransactionNotifier> notifier, TxnTimestamp ts, std::shared_ptr<const Snapshot>* ret); // Create a timestamped snapshot if caller guarantees no concurrent writes std::pair<Status, std::shared_ptr<const Snapshot>> snapshot = txn_db->CreateTimestampedSnapshot(100); ``` The snapshots created in this way will be managed by RocksDB with ref-counting and potentially shared with other readers. We provide the following APIs for readers to retrieve a snapshot given a timestamp. ```cpp // Return the timestamped snapshot correponding to given timestamp. If ts is // kMaxTxnTimestamp, then we return the latest timestamped snapshot if present. // Othersise, we return the snapshot whose timestamp is equal to `ts`. If no // such snapshot exists, then we return null. std::shared_ptr<const Snapshot> TransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const; // Return the latest timestamped snapshot if present. std::shared_ptr<const Snapshot> TransactionDB::GetLatestTimestampedSnapshot() const; ``` We also provide two additional APIs for stats collection and reporting purposes. ```cpp Status TransactionDB::GetAllTimestampedSnapshots( std::vector<std::shared_ptr<const Snapshot>>& snapshots) const; // Return timestamped snapshots whose timestamps fall in [ts_lb, ts_ub) and store them in `snapshots`. Status TransactionDB::GetTimestampedSnapshots( TxnTimestamp ts_lb, TxnTimestamp ts_ub, std::vector<std::shared_ptr<const Snapshot>>& snapshots) const; ``` To prevent the number of timestamped snapshots from growing infinitely, we provide the following API to release timestamped snapshots whose timestamps are older than or equal to a given threshold. ```cpp void TransactionDB::ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts); ``` Before shutdown, RocksDB will release all timestamped snapshots. Comparison with user-defined timestamp and how they can be combined: User-defined timestamp persists every key with a timestamp, while timestamped snapshots maintain a volatile mapping between snapshots (sequence numbers) and timestamps. Different internal keys with the same user key but different timestamps will be treated as different by compaction, thus a newer version will not hide older versions (with smaller timestamps) unless they are eligible for garbage collection. In contrast, taking a timestamped snapshot at a certain sequence number and timestamp prevents all the keys visible in this snapshot from been dropped by compaction. Here, visible means (seq < snapshot and most recent). The timestamped snapshot supports the semantics of reading at an exact point in time. Timestamped snapshots can also be used with user-defined timestamp. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9879 Test Plan: ``` make check TEST_TMPDIR=/dev/shm make crash_test_with_txn ``` Reviewed By: siying Differential Revision: D35783919 Pulled By: riversand963 fbshipit-source-id: 586ad905e169189e19d3bfc0cb0177a7239d1bd4
2022-06-10 23:07:03 +00:00
s = CommitTxn(txn, thread);
}
}
#endif
}
}
shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
is_db_stopped_ = true;
} else if (!is_db_stopped_ ||
s.severity() < Status::Severity::kFatalError) {
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
std::terminate();
}
} else {
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
std::terminate();
}
}
thread->stats.AddBytesForWrites(1, sz);
PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
sz);
return s;
}
Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& /* lock */) override {
int64_t rand_key = rand_keys[0];
int rand_column_family = rand_column_families[0];
auto shared = thread->shared;
// OPERATION delete
std::string write_ts_str = GetNowNanos();
Slice write_ts = write_ts_str;
std::string key_str = Key(rand_key);
Slice key = key_str;
auto cfh = column_families_[rand_column_family];
// Use delete if the key may be overwritten and a single deletion
// otherwise.
Status s;
if (shared->AllowsOverwrite(rand_key)) {
shared->Delete(rand_column_family, rand_key, true /* pending */);
if (!FLAGS_use_txn) {
Revise APIs related to user-defined timestamp (#8946) Summary: ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`. Namely, `WriteOptions` should not include information about "what-to-write", but should just include information about "how-to-write". According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore, this PR removes `WriteOptions::timestamp` for compliance. After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and `SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity made me reconsider doing it in another PR (maybe). For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take extra `timestamp` information when writing to `WriteBatch`es. These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list. Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to `WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps allocated already and multiple timestamps can be updated. The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp size of the default column family. This will be used to allocate space when calling APIs that do not specify a column family handle. Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing some assertions about timestamp to returning Status code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946 Test Plan: make check ./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8 ./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0 Make sure there is no perf regression by running the following ``` ./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom ``` Before this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s ``` After this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s ``` Reviewed By: ltamasi Differential Revision: D33721359 Pulled By: riversand963 fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 06:17:46 +00:00
if (FLAGS_user_timestamp_size == 0) {
s = db_->Delete(write_opts, cfh, key);
} else {
s = db_->Delete(write_opts, cfh, key, write_ts);
}
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Delete(cfh, key);
if (s.ok()) {
Snapshots with user-specified timestamps (#9879) Summary: In RocksDB, keys are associated with (internal) sequence numbers which denote when the keys are written to the database. Sequence numbers in different RocksDB instances are unrelated, thus not comparable. It is nice if we can associate sequence numbers with their corresponding actual timestamps. One thing we can do is to support user-defined timestamp, which allows the applications to specify the format of custom timestamps and encode a timestamp with each key. More details can be found at https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp-%28Experimental%29. This PR provides a different but complementary approach. We can associate rocksdb snapshots (defined in https://github.com/facebook/rocksdb/blob/7.2.fb/include/rocksdb/snapshot.h#L20) with **user-specified** timestamps. Since a snapshot is essentially an object representing a sequence number, this PR establishes a bi-directional mapping between sequence numbers and timestamps. In the past, snapshots are usually taken by readers. The current super-version is grabbed, and a `rocksdb::Snapshot` object is created with the last published sequence number of the super-version. You can see that the reader actually has no good idea of what timestamp to assign to this snapshot, because by the time the `GetSnapshot()` is called, an arbitrarily long period of time may have already elapsed since the last write, which is when the last published sequence number is written. This observation motivates the creation of "timestamped" snapshots on the write path. Currently, this functionality is exposed only to the layer of `TransactionDB`. Application can tell RocksDB to create a snapshot when a transaction commits, effectively associating the last sequence number with a timestamp. It is also assumed that application will ensure any two snapshots with timestamps should satisfy the following: ``` snapshot1.seq < snapshot2.seq iff. snapshot1.ts < snapshot2.ts ``` If the application can guarantee that when a reader takes a timestamped snapshot, there is no active writes going on in the database, then we also allow the user to use a new API `TransactionDB::CreateTimestampedSnapshot()` to create a snapshot with associated timestamp. Code example ```cpp // Create a timestamped snapshot when committing transaction. txn->SetCommitTimestamp(100); txn->SetSnapshotOnNextOperation(); txn->Commit(); // A wrapper API for convenience Status Transaction::CommitAndTryCreateSnapshot( std::shared_ptr<TransactionNotifier> notifier, TxnTimestamp ts, std::shared_ptr<const Snapshot>* ret); // Create a timestamped snapshot if caller guarantees no concurrent writes std::pair<Status, std::shared_ptr<const Snapshot>> snapshot = txn_db->CreateTimestampedSnapshot(100); ``` The snapshots created in this way will be managed by RocksDB with ref-counting and potentially shared with other readers. We provide the following APIs for readers to retrieve a snapshot given a timestamp. ```cpp // Return the timestamped snapshot correponding to given timestamp. If ts is // kMaxTxnTimestamp, then we return the latest timestamped snapshot if present. // Othersise, we return the snapshot whose timestamp is equal to `ts`. If no // such snapshot exists, then we return null. std::shared_ptr<const Snapshot> TransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const; // Return the latest timestamped snapshot if present. std::shared_ptr<const Snapshot> TransactionDB::GetLatestTimestampedSnapshot() const; ``` We also provide two additional APIs for stats collection and reporting purposes. ```cpp Status TransactionDB::GetAllTimestampedSnapshots( std::vector<std::shared_ptr<const Snapshot>>& snapshots) const; // Return timestamped snapshots whose timestamps fall in [ts_lb, ts_ub) and store them in `snapshots`. Status TransactionDB::GetTimestampedSnapshots( TxnTimestamp ts_lb, TxnTimestamp ts_ub, std::vector<std::shared_ptr<const Snapshot>>& snapshots) const; ``` To prevent the number of timestamped snapshots from growing infinitely, we provide the following API to release timestamped snapshots whose timestamps are older than or equal to a given threshold. ```cpp void TransactionDB::ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts); ``` Before shutdown, RocksDB will release all timestamped snapshots. Comparison with user-defined timestamp and how they can be combined: User-defined timestamp persists every key with a timestamp, while timestamped snapshots maintain a volatile mapping between snapshots (sequence numbers) and timestamps. Different internal keys with the same user key but different timestamps will be treated as different by compaction, thus a newer version will not hide older versions (with smaller timestamps) unless they are eligible for garbage collection. In contrast, taking a timestamped snapshot at a certain sequence number and timestamp prevents all the keys visible in this snapshot from been dropped by compaction. Here, visible means (seq < snapshot and most recent). The timestamped snapshot supports the semantics of reading at an exact point in time. Timestamped snapshots can also be used with user-defined timestamp. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9879 Test Plan: ``` make check TEST_TMPDIR=/dev/shm make crash_test_with_txn ``` Reviewed By: siying Differential Revision: D35783919 Pulled By: riversand963 fbshipit-source-id: 586ad905e169189e19d3bfc0cb0177a7239d1bd4
2022-06-10 23:07:03 +00:00
s = CommitTxn(txn, thread);
}
}
#endif
}
shared->Delete(rand_column_family, rand_key, false /* pending */);
thread->stats.AddDeletes(1);
if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (!is_db_stopped_ &&
s.severity() >= Status::Severity::kFatalError) {
is_db_stopped_ = true;
} else if (!is_db_stopped_ ||
s.severity() < Status::Severity::kFatalError) {
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
std::terminate();
}
} else {
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
std::terminate();
}
}
} else {
shared->SingleDelete(rand_column_family, rand_key, true /* pending */);
if (!FLAGS_use_txn) {
Revise APIs related to user-defined timestamp (#8946) Summary: ajkr reminded me that we have a rule of not including per-kv related data in `WriteOptions`. Namely, `WriteOptions` should not include information about "what-to-write", but should just include information about "how-to-write". According to this rule, `WriteOptions::timestamp` (experimental) is clearly a violation. Therefore, this PR removes `WriteOptions::timestamp` for compliance. After the removal, we need to pass timestamp info via another set of APIs. This PR proposes a set of overloaded functions `Put(write_opts, key, value, ts)`, `Delete(write_opts, key, ts)`, and `SingleDelete(write_opts, key, ts)`. Planned to add `Write(write_opts, batch, ts)`, but its complexity made me reconsider doing it in another PR (maybe). For better checking and returning error early, we also add a new set of APIs to `WriteBatch` that take extra `timestamp` information when writing to `WriteBatch`es. These set of APIs in `WriteBatchWithIndex` are currently not supported, and are on our TODO list. Removed `WriteBatch::AssignTimestamps()` and renamed `WriteBatch::AssignTimestamp()` to `WriteBatch::UpdateTimestamps()` since this method require that all keys have space for timestamps allocated already and multiple timestamps can be updated. The constructor of `WriteBatch` now takes a fourth argument `default_cf_ts_sz` which is the timestamp size of the default column family. This will be used to allocate space when calling APIs that do not specify a column family handle. Also, updated `DB::Get()`, `DB::MultiGet()`, `DB::NewIterator()`, `DB::NewIterators()` methods, replacing some assertions about timestamp to returning Status code. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8946 Test Plan: make check ./db_bench -benchmarks=fillseq,fillrandom,readrandom,readseq,deleterandom -user_timestamp_size=8 ./db_stress --user_timestamp_size=8 -nooverwritepercent=0 -test_secondary=0 -secondary_catch_up_one_in=0 -continuous_verification_interval=0 Make sure there is no perf regression by running the following ``` ./db_bench_opt -db=/dev/shm/rocksdb -use_existing_db=0 -level0_stop_writes_trigger=256 -level0_slowdown_writes_trigger=256 -level0_file_num_compaction_trigger=256 -disable_wal=1 -duration=10 -benchmarks=fillrandom ``` Before this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.831 micros/op 546235 ops/sec; 60.4 MB/s ``` After this PR ``` DB path: [/dev/shm/rocksdb] fillrandom : 1.820 micros/op 549404 ops/sec; 60.8 MB/s ``` Reviewed By: ltamasi Differential Revision: D33721359 Pulled By: riversand963 fbshipit-source-id: c131561534272c120ffb80711d42748d21badf09
2022-02-02 06:17:46 +00:00
if (FLAGS_user_timestamp_size == 0) {
s = db_->SingleDelete(write_opts, cfh, key);
} else {
s = db_->SingleDelete(write_opts, cfh, key, write_ts);
}
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->SingleDelete(cfh, key);
if (s.ok()) {
Snapshots with user-specified timestamps (#9879) Summary: In RocksDB, keys are associated with (internal) sequence numbers which denote when the keys are written to the database. Sequence numbers in different RocksDB instances are unrelated, thus not comparable. It is nice if we can associate sequence numbers with their corresponding actual timestamps. One thing we can do is to support user-defined timestamp, which allows the applications to specify the format of custom timestamps and encode a timestamp with each key. More details can be found at https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp-%28Experimental%29. This PR provides a different but complementary approach. We can associate rocksdb snapshots (defined in https://github.com/facebook/rocksdb/blob/7.2.fb/include/rocksdb/snapshot.h#L20) with **user-specified** timestamps. Since a snapshot is essentially an object representing a sequence number, this PR establishes a bi-directional mapping between sequence numbers and timestamps. In the past, snapshots are usually taken by readers. The current super-version is grabbed, and a `rocksdb::Snapshot` object is created with the last published sequence number of the super-version. You can see that the reader actually has no good idea of what timestamp to assign to this snapshot, because by the time the `GetSnapshot()` is called, an arbitrarily long period of time may have already elapsed since the last write, which is when the last published sequence number is written. This observation motivates the creation of "timestamped" snapshots on the write path. Currently, this functionality is exposed only to the layer of `TransactionDB`. Application can tell RocksDB to create a snapshot when a transaction commits, effectively associating the last sequence number with a timestamp. It is also assumed that application will ensure any two snapshots with timestamps should satisfy the following: ``` snapshot1.seq < snapshot2.seq iff. snapshot1.ts < snapshot2.ts ``` If the application can guarantee that when a reader takes a timestamped snapshot, there is no active writes going on in the database, then we also allow the user to use a new API `TransactionDB::CreateTimestampedSnapshot()` to create a snapshot with associated timestamp. Code example ```cpp // Create a timestamped snapshot when committing transaction. txn->SetCommitTimestamp(100); txn->SetSnapshotOnNextOperation(); txn->Commit(); // A wrapper API for convenience Status Transaction::CommitAndTryCreateSnapshot( std::shared_ptr<TransactionNotifier> notifier, TxnTimestamp ts, std::shared_ptr<const Snapshot>* ret); // Create a timestamped snapshot if caller guarantees no concurrent writes std::pair<Status, std::shared_ptr<const Snapshot>> snapshot = txn_db->CreateTimestampedSnapshot(100); ``` The snapshots created in this way will be managed by RocksDB with ref-counting and potentially shared with other readers. We provide the following APIs for readers to retrieve a snapshot given a timestamp. ```cpp // Return the timestamped snapshot correponding to given timestamp. If ts is // kMaxTxnTimestamp, then we return the latest timestamped snapshot if present. // Othersise, we return the snapshot whose timestamp is equal to `ts`. If no // such snapshot exists, then we return null. std::shared_ptr<const Snapshot> TransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const; // Return the latest timestamped snapshot if present. std::shared_ptr<const Snapshot> TransactionDB::GetLatestTimestampedSnapshot() const; ``` We also provide two additional APIs for stats collection and reporting purposes. ```cpp Status TransactionDB::GetAllTimestampedSnapshots( std::vector<std::shared_ptr<const Snapshot>>& snapshots) const; // Return timestamped snapshots whose timestamps fall in [ts_lb, ts_ub) and store them in `snapshots`. Status TransactionDB::GetTimestampedSnapshots( TxnTimestamp ts_lb, TxnTimestamp ts_ub, std::vector<std::shared_ptr<const Snapshot>>& snapshots) const; ``` To prevent the number of timestamped snapshots from growing infinitely, we provide the following API to release timestamped snapshots whose timestamps are older than or equal to a given threshold. ```cpp void TransactionDB::ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts); ``` Before shutdown, RocksDB will release all timestamped snapshots. Comparison with user-defined timestamp and how they can be combined: User-defined timestamp persists every key with a timestamp, while timestamped snapshots maintain a volatile mapping between snapshots (sequence numbers) and timestamps. Different internal keys with the same user key but different timestamps will be treated as different by compaction, thus a newer version will not hide older versions (with smaller timestamps) unless they are eligible for garbage collection. In contrast, taking a timestamped snapshot at a certain sequence number and timestamp prevents all the keys visible in this snapshot from been dropped by compaction. Here, visible means (seq < snapshot and most recent). The timestamped snapshot supports the semantics of reading at an exact point in time. Timestamped snapshots can also be used with user-defined timestamp. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9879 Test Plan: ``` make check TEST_TMPDIR=/dev/shm make crash_test_with_txn ``` Reviewed By: siying Differential Revision: D35783919 Pulled By: riversand963 fbshipit-source-id: 586ad905e169189e19d3bfc0cb0177a7239d1bd4
2022-06-10 23:07:03 +00:00
s = CommitTxn(txn, thread);
}
}
#endif
}
shared->SingleDelete(rand_column_family, rand_key, false /* pending */);
thread->stats.AddSingleDeletes(1);
if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (!is_db_stopped_ &&
s.severity() >= Status::Severity::kFatalError) {
is_db_stopped_ = true;
} else if (!is_db_stopped_ ||
s.severity() < Status::Severity::kFatalError) {
fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
std::terminate();
}
} else {
fprintf(stderr, "single delete error: %s\n", s.ToString().c_str());
std::terminate();
}
}
}
return s;
}
Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& lock) override {
// OPERATION delete range
std::vector<std::unique_ptr<MutexLock>> range_locks;
// delete range does not respect disallowed overwrites. the keys for
// which overwrites are disallowed are randomly distributed so it
// could be expensive to find a range where each key allows
// overwrites.
int64_t rand_key = rand_keys[0];
int rand_column_family = rand_column_families[0];
auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey();
if (rand_key > max_key - FLAGS_range_deletion_width) {
lock.reset();
rand_key =
thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
} else {
range_locks.emplace_back(std::move(lock));
}
for (int j = 1; j < FLAGS_range_deletion_width; ++j) {
if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
range_locks.emplace_back(new MutexLock(
shared->GetMutexForKey(rand_column_family, rand_key + j)));
}
}
shared->DeleteRange(rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width,
true /* pending */);
std::string keystr = Key(rand_key);
Slice key = keystr;
auto cfh = column_families_[rand_column_family];
std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
Slice end_key = end_keystr;
Status s = db_->DeleteRange(write_opts, cfh, key, end_key);
if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
is_db_stopped_ = true;
} else if (!is_db_stopped_ ||
s.severity() < Status::Severity::kFatalError) {
fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
std::terminate();
}
} else {
fprintf(stderr, "delete range error: %s\n", s.ToString().c_str());
std::terminate();
}
}
int covered = shared->DeleteRange(rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width,
false /* pending */);
thread->stats.AddRangeDeletions(1);
thread->stats.AddCoveredByRangeDeletions(covered);
return s;
}
#ifdef ROCKSDB_LITE
void TestIngestExternalFile(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */,
std::unique_ptr<MutexLock>& /* lock */) override {
assert(false);
fprintf(stderr,
"RocksDB lite does not support "
"TestIngestExternalFile\n");
std::terminate();
}
#else
void TestIngestExternalFile(ThreadState* thread,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& lock) override {
const std::string sst_filename =
FLAGS_db + "/." + std::to_string(thread->tid) + ".sst";
Status s;
if (db_stress_env->FileExists(sst_filename).ok()) {
// Maybe we terminated abnormally before, so cleanup to give this file
// ingestion a clean slate
s = db_stress_env->DeleteFile(sst_filename);
}
SstFileWriter sst_file_writer(EnvOptions(options_), options_);
if (s.ok()) {
s = sst_file_writer.Open(sst_filename);
}
int64_t key_base = rand_keys[0];
int column_family = rand_column_families[0];
std::vector<std::unique_ptr<MutexLock>> range_locks;
range_locks.reserve(FLAGS_ingest_external_file_width);
std::vector<int64_t> keys;
keys.reserve(FLAGS_ingest_external_file_width);
std::vector<uint32_t> values;
values.reserve(FLAGS_ingest_external_file_width);
SharedState* shared = thread->shared;
assert(FLAGS_nooverwritepercent < 100);
// Grab locks, set pending state on expected values, and add keys
for (int64_t key = key_base;
s.ok() && key < shared->GetMaxKey() &&
static_cast<int32_t>(keys.size()) < FLAGS_ingest_external_file_width;
++key) {
if (key == key_base) {
range_locks.emplace_back(std::move(lock));
} else if ((key & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(column_family, key)));
}
if (!shared->AllowsOverwrite(key)) {
// We could alternatively include `key` on the condition its current
// value is `DELETION_SENTINEL`.
continue;
}
keys.push_back(key);
uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
values.push_back(value_base);
shared->Put(column_family, key, value_base, true /* pending */);
char value[100];
size_t value_len = GenerateValue(value_base, value, sizeof(value));
auto key_str = Key(key);
s = sst_file_writer.Put(Slice(key_str), Slice(value, value_len));
}
if (s.ok()) {
s = sst_file_writer.Finish();
}
if (s.ok()) {
s = db_->IngestExternalFile(column_families_[column_family],
{sst_filename}, IngestExternalFileOptions());
}
if (!s.ok()) {
fprintf(stderr, "file ingestion error: %s\n", s.ToString().c_str());
std::terminate();
}
for (size_t i = 0; i < keys.size(); ++i) {
shared->Put(column_family, keys[i], values[i], false /* pending */);
}
}
#endif // ROCKSDB_LITE
Add Iterator test against expected state to stress test (#10538) Summary: As mentioned in https://github.com/facebook/rocksdb/pull/5506#issuecomment-506021913, `db_stress` does not have much verification for iterator correctness. It has a `TestIterate()` function, but that is mainly for comparing results between two iterators, one with `total_order_seek` and the other optionally sets auto_prefix, upper/lower bounds. Commit 49a0581ad2462e31aa3f768afa769e0d33390f33 added a new `TestIterateAgainstExpected()` function that compares iterator against expected state. It locks a range of keys, creates an iterator, does a random sequence of `Next/Prev` and compares against expected state. This PR is based on that commit, the main changes include some logs (for easier debugging if a test fails), a forward and backward scan to cover the entire locked key range, and a flag for optionally turning on this version of Iterator testing. Added constraint that the checks against expected state in `TestIterateAgainstExpected()` and in `TestGet()` are only turned on when `--skip_verifydb` flag is not set. Remove the change log introduced in https://github.com/facebook/rocksdb/issues/10553. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10538 Test Plan: Run `db_stress` with `--verify_iterator_with_expected_state_one_in=1`, and a large `--iterpercent` and `--num_iterations`. Checked `op_logs` manually to ensure expected coverage. Tweaked part of the code in https://github.com/facebook/rocksdb/issues/10449 and stress test was able to catch it. - internally run various flavor of crash test Reviewed By: ajkr Differential Revision: D38847269 Pulled By: cbi42 fbshipit-source-id: 8b4402a9bba9f6cfa08051943cd672579d489599
2022-08-24 21:59:50 +00:00
// Given a key K, this creates an iterator which scans the range
// [K, K + FLAGS_num_iterations) forward and backward.
// Then does a random sequence of Next/Prev operations.
Status TestIterateAgainstExpected(
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& lock) override {
// Lock the whole range over which we might iterate to ensure it doesn't
// change under us.
std::vector<std::unique_ptr<MutexLock>> range_locks;
int64_t lb = rand_keys[0];
int rand_column_family = rand_column_families[0];
auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey();
if (static_cast<uint64_t>(lb) > max_key - FLAGS_num_iterations) {
lock.reset();
lb = thread->rand.Next() % (max_key - FLAGS_num_iterations + 1);
range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(rand_column_family, lb)));
} else {
range_locks.emplace_back(std::move(lock));
}
for (int j = 1; j < static_cast<int>(FLAGS_num_iterations); ++j) {
if (((lb + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(rand_column_family, lb + j)));
}
}
int64_t ub = lb + FLAGS_num_iterations;
// Locks acquired for [lb, ub)
ReadOptions readoptscopy(read_opts);
readoptscopy.total_order_seek = true;
auto cfh = column_families_[rand_column_family];
std::string op_logs;
std::unique_ptr<Iterator> iter(db_->NewIterator(readoptscopy, cfh));
auto check_no_key_in_range = [&](int64_t start, int64_t end) {
for (auto j = std::max(start, lb); j < std::min(end, ub); ++j) {
auto expected_value =
shared->Get(rand_column_family, static_cast<int64_t>(j));
if (expected_value != shared->DELETION_SENTINEL &&
expected_value != shared->UNKNOWN_SENTINEL) {
// Fail fast to preserve the DB state.
thread->shared->SetVerificationFailure();
if (iter->Valid()) {
fprintf(stderr,
"Expected state has key %s, iterator is at key %s\n",
Slice(Key(j)).ToString(true).c_str(),
iter->key().ToString(true).c_str());
} else {
fprintf(stderr, "Expected state has key %s, iterator is invalid\n",
Slice(Key(j)).ToString(true).c_str());
}
fprintf(stderr, "Column family: %s, op_logs: %s\n",
cfh->GetName().c_str(), op_logs.c_str());
thread->stats.AddErrors(1);
return false;
}
}
return true;
};
// Forward and backward scan to ensure we cover the entire range [lb, ub).
// The random sequence Next and Prev test below tends to be very short
// ranged.
int64_t last_key = lb - 1;
std::string key_str = Key(lb);
iter->Seek(Slice(key_str));
op_logs += "S " + Slice(key_str).ToString(true) + " ";
uint64_t curr;
while (true) {
if (!iter->Valid()) {
if (!iter->status().ok()) {
thread->shared->SetVerificationFailure();
fprintf(stderr, "TestIterate against expected state error: %s\n",
iter->status().ToString().c_str());
fprintf(stderr, "Column family: %s, op_logs: %s\n",
cfh->GetName().c_str(), op_logs.c_str());
thread->stats.AddErrors(1);
return iter->status();
}
if (!check_no_key_in_range(last_key + 1, static_cast<int64_t>(ub))) {
// error reported in check_no_key_in_range()
return Status::OK();
}
break;
}
// iter is valid, the range (last_key, current key) was skipped
GetIntVal(iter->key().ToString(), &curr);
if (!check_no_key_in_range(last_key + 1, static_cast<int64_t>(curr))) {
return Status::OK();
}
last_key = static_cast<int64_t>(curr);
if (last_key >= ub - 1) {
break;
}
iter->Next();
op_logs += "N";
}
// backward scan
key_str = Key(ub - 1);
iter->SeekForPrev(Slice(key_str));
op_logs += " SFP " + Slice(key_str).ToString(true) + " ";
last_key = ub;
while (true) {
if (!iter->Valid()) {
if (!iter->status().ok()) {
thread->shared->SetVerificationFailure();
fprintf(stderr, "TestIterate against expected state error: %s\n",
iter->status().ToString().c_str());
fprintf(stderr, "Column family: %s, op_logs: %s\n",
cfh->GetName().c_str(), op_logs.c_str());
thread->stats.AddErrors(1);
return iter->status();
}
if (!check_no_key_in_range(lb, last_key)) {
return Status::OK();
}
break;
}
// the range (current key, last key) was skipped
GetIntVal(iter->key().ToString(), &curr);
if (!check_no_key_in_range(static_cast<int64_t>(curr + 1), last_key)) {
return Status::OK();
}
last_key = static_cast<int64_t>(curr);
if (last_key <= lb) {
break;
}
iter->Prev();
op_logs += "P";
}
// start from middle of [lb, ub) otherwise it is easy to iterate out of
// locked range
int64_t mid = lb + static_cast<int64_t>(FLAGS_num_iterations / 2);
key_str = Key(mid);
Slice key = key_str;
if (thread->rand.OneIn(2)) {
iter->Seek(key);
op_logs += " S " + key.ToString(true) + " ";
if (!iter->Valid() && iter->status().ok()) {
if (!check_no_key_in_range(mid, ub)) {
return Status::OK();
}
}
} else {
iter->SeekForPrev(key);
op_logs += " SFP " + key.ToString(true) + " ";
if (!iter->Valid() && iter->status().ok()) {
// iterator says nothing <= mid
if (!check_no_key_in_range(lb, mid + 1)) {
return Status::OK();
}
}
}
for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
GetIntVal(iter->key().ToString(), &curr);
if (curr < static_cast<uint64_t>(lb)) {
iter->Next();
op_logs += "N";
} else if (curr >= static_cast<uint64_t>(ub)) {
iter->Prev();
op_logs += "P";
} else {
uint32_t expected_value =
shared->Get(rand_column_family, static_cast<int64_t>(curr));
if (expected_value == shared->DELETION_SENTINEL) {
// Fail fast to preserve the DB state.
thread->shared->SetVerificationFailure();
fprintf(stderr, "Iterator has key %s, but expected state does not.\n",
iter->key().ToString(true).c_str());
fprintf(stderr, "Column family: %s, op_logs: %s\n",
cfh->GetName().c_str(), op_logs.c_str());
thread->stats.AddErrors(1);
break;
}
if (thread->rand.OneIn(2)) {
iter->Next();
op_logs += "N";
if (!iter->Valid()) {
break;
}
uint64_t next;
GetIntVal(iter->key().ToString(), &next);
if (!check_no_key_in_range(static_cast<int64_t>(curr + 1),
static_cast<int64_t>(next))) {
return Status::OK();
}
} else {
iter->Prev();
op_logs += "P";
if (!iter->Valid()) {
break;
}
uint64_t prev;
GetIntVal(iter->key().ToString(), &prev);
if (!check_no_key_in_range(static_cast<int64_t>(prev + 1),
static_cast<int64_t>(curr))) {
return Status::OK();
}
}
}
}
if (!iter->status().ok()) {
thread->shared->SetVerificationFailure();
fprintf(stderr, "TestIterate against expected state error: %s\n",
iter->status().ToString().c_str());
fprintf(stderr, "Column family: %s, op_logs: %s\n",
cfh->GetName().c_str(), op_logs.c_str());
thread->stats.AddErrors(1);
return iter->status();
}
thread->stats.AddIterations(1);
return Status::OK();
}
bool VerifyOrSyncValue(int cf, int64_t key, const ReadOptions& /*opts*/,
SharedState* shared, const std::string& value_from_db,
const Status& s, bool strict = false) const {
if (shared->HasVerificationFailedYet()) {
return false;
}
// compare value_from_db with the value in the shared state
uint32_t value_base = shared->Get(cf, key);
if (value_base == SharedState::UNKNOWN_SENTINEL) {
if (s.ok()) {
// Value exists in db, update state to reflect that
Slice slice(value_from_db);
value_base = GetValueBase(slice);
shared->Put(cf, key, value_base, false);
} else if (s.IsNotFound()) {
// Value doesn't exist in db, update state to reflect that
shared->SingleDelete(cf, key, false);
}
return true;
}
if (value_base == SharedState::DELETION_SENTINEL && !strict) {
return true;
}
if (s.ok()) {
char value[kValueMaxLen];
if (value_base == SharedState::DELETION_SENTINEL) {
VerificationAbort(shared, "Unexpected value found", cf, key);
return false;
}
size_t sz = GenerateValue(value_base, value, sizeof(value));
if (value_from_db.length() != sz) {
VerificationAbort(shared, "Length of value read is not equal", cf, key);
return false;
}
if (memcmp(value_from_db.data(), value, sz) != 0) {
VerificationAbort(shared, "Contents of value read don't match", cf,
key);
return false;
}
} else {
if (value_base != SharedState::DELETION_SENTINEL) {
VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
return false;
}
}
return true;
}
#ifndef ROCKSDB_LITE
void PrepareTxnDbOptions(SharedState* shared,
TransactionDBOptions& txn_db_opts) override {
txn_db_opts.rollback_deletion_type_callback =
[shared](TransactionDB*, ColumnFamilyHandle*, const Slice& key) {
assert(shared);
uint64_t key_num = 0;
bool ok = GetIntVal(key.ToString(), &key_num);
assert(ok);
(void)ok;
return !shared->AllowsOverwrite(key_num);
};
}
#endif // ROCKSDB_LITE
};
StressTest* CreateNonBatchedOpsStressTest() {
return new NonBatchedOpsStressTest();
}
} // namespace ROCKSDB_NAMESPACE
#endif // GFLAGS