Improve stability of db_stress

Summary:
Currently, whenever DB Verification fails we bail out by calling `exit(1)`. This is kind of bad since it causes unclean shutdown and spew of error log messages like:

    05:03:27 pthread lock: Invalid argument
    05:03:27 pthread lock: Invalid argument
    05:03:27 pthread lock: Invalid argument
    05:03:27 pthread lock: Invalid argument
    05:03:27 pthread lock: Invalid argument
    05:03:27 pthread lock: Invalid argument
    05:03:27 pthread lock: Invalid argument
    05:03:27 pthread lock: Invalid argument
    05:03:27 pthread lock: Invalid argument

This diff adds a new parameter that is set to true when verification fails. It can then use the parameter to bail out safely.

Test Plan: Casued artificail failure. Verified that exit was clean.

Reviewers: dhruba, haobo, ljin

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D18243
This commit is contained in:
Igor Canadi 2014-04-24 09:22:58 -04:00
parent d8fe006056
commit 2413a06c7b
1 changed files with 61 additions and 23 deletions

View File

@ -651,6 +651,10 @@ class SharedState {
return start_verify_; return start_verify_;
} }
void SetVerificationFailure() { verification_failure_.store(true); }
bool HasVerificationFailedYet() { return verification_failure_.load(); }
port::Mutex* GetMutexForKey(int cf, long key) { port::Mutex* GetMutexForKey(int cf, long key) {
return &key_locks_[cf][key >> log2_keys_per_lock_]; return &key_locks_[cf][key >> log2_keys_per_lock_];
} }
@ -695,6 +699,7 @@ class SharedState {
bool start_; bool start_;
bool start_verify_; bool start_verify_;
StressTest* stress_test_; StressTest* stress_test_;
std::atomic<bool> verification_failure_;
std::vector<std::vector<uint32_t>> values_; std::vector<std::vector<uint32_t>> values_;
std::vector<std::vector<port::Mutex>> key_locks_; std::vector<std::vector<port::Mutex>> key_locks_;
@ -752,7 +757,7 @@ class StressTest {
delete filter_policy_; delete filter_policy_;
} }
void Run() { bool Run() {
PrintEnv(); PrintEnv();
Open(); Open();
SharedState shared(this); SharedState shared(this);
@ -814,6 +819,12 @@ class StressTest {
FLAGS_env->TimeToString((uint64_t) now/1000000).c_str()); FLAGS_env->TimeToString((uint64_t) now/1000000).c_str());
} }
PrintStatistics(); PrintStatistics();
if (shared.HasVerificationFailedYet()) {
printf("Verification failed :(\n");
return false;
}
return true;
} }
private: private:
@ -1101,6 +1112,9 @@ class StressTest {
thread->stats.Start(); thread->stats.Start();
for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
if (i != 0 && (i % (FLAGS_ops_per_thread / (FLAGS_reopen + 1))) == 0) { if (i != 0 && (i % (FLAGS_ops_per_thread / (FLAGS_reopen + 1))) == 0) {
{ {
thread->stats.FinishedSingleOp(); thread->stats.FinishedSingleOp();
@ -1211,8 +1225,10 @@ class StressTest {
std::string keystr2 = Key(rand_key); std::string keystr2 = Key(rand_key);
Slice k = keystr2; Slice k = keystr2;
Status s = db_->Get(read_opts, column_family, k, &from_db); Status s = db_->Get(read_opts, column_family, k, &from_db);
VerifyValue(rand_column_family, rand_key, read_opts, if (VerifyValue(rand_column_family, rand_key, read_opts,
*(thread->shared), from_db, s, true); thread->shared, from_db, s, true) == false) {
break;
}
} }
thread->shared->Put(rand_column_family, rand_key, value_base); thread->shared->Put(rand_column_family, rand_key, value_base);
if (FLAGS_use_merge) { if (FLAGS_use_merge) {
@ -1246,15 +1262,18 @@ class StressTest {
void VerifyDb(ThreadState* thread) const { void VerifyDb(ThreadState* thread) const {
ReadOptions options(FLAGS_verify_checksum, true); ReadOptions options(FLAGS_verify_checksum, true);
const SharedState& shared = *(thread->shared); auto shared = thread->shared;
static const long max_key = shared.GetMaxKey(); static const long max_key = shared->GetMaxKey();
static const long keys_per_thread = max_key / shared.GetNumThreads(); static const long keys_per_thread = max_key / shared->GetNumThreads();
long start = keys_per_thread * thread->tid; long start = keys_per_thread * thread->tid;
long end = start + keys_per_thread; long end = start + keys_per_thread;
if (thread->tid == shared.GetNumThreads() - 1) { if (thread->tid == shared->GetNumThreads() - 1) {
end = max_key; end = max_key;
} }
for (size_t cf = 0; cf < column_families_.size(); ++cf) { for (size_t cf = 0; cf < column_families_.size(); ++cf) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
if (!thread->rand.OneIn(2)) { if (!thread->rand.OneIn(2)) {
// Use iterator to verify this range // Use iterator to verify this range
options.prefix_seek = FLAGS_prefix_size > 0; options.prefix_seek = FLAGS_prefix_size > 0;
@ -1262,6 +1281,9 @@ class StressTest {
db_->NewIterator(options, column_families_[cf])); db_->NewIterator(options, column_families_[cf]));
iter->Seek(Key(start)); iter->Seek(Key(start));
for (long i = start; i < end; i++) { for (long i = start; i < end; i++) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
// TODO(ljin): update "long" to uint64_t // TODO(ljin): update "long" to uint64_t
// Reseek when the prefix changes // Reseek when the prefix changes
if (i % (static_cast<int64_t>(1) << 8 * (8 - FLAGS_prefix_size)) == if (i % (static_cast<int64_t>(1) << 8 * (8 - FLAGS_prefix_size)) ==
@ -1279,7 +1301,7 @@ class StressTest {
from_db = iter->value().ToString(); from_db = iter->value().ToString();
iter->Next(); iter->Next();
} else if (iter->key().compare(k) < 0) { } else if (iter->key().compare(k) < 0) {
VerificationAbort("An out of range key was found", cf, i); VerificationAbort(shared, "An out of range key was found", cf, i);
} }
} else { } else {
// The iterator found no value for the key in question, so do not // The iterator found no value for the key in question, so do not
@ -1294,6 +1316,9 @@ class StressTest {
} else { } else {
// Use Get to verify this range // Use Get to verify this range
for (long i = start; i < end; i++) { for (long i = start; i < end; i++) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}
std::string from_db; std::string from_db;
std::string keystr = Key(i); std::string keystr = Key(i);
Slice k = keystr; Slice k = keystr;
@ -1307,38 +1332,48 @@ class StressTest {
} }
} }
void VerificationAbort(std::string msg, int cf, long key) const { void VerificationAbort(SharedState* shared, std::string msg, int cf,
fprintf(stderr, "Verification failed for column family %d key %ld: %s\n", long key) const {
cf, key, msg.c_str()); printf("Verification failed for column family %d key %ld: %s\n", cf, key,
exit(1); msg.c_str());
shared->SetVerificationFailure();
} }
void VerifyValue(int cf, long key, const ReadOptions& opts, bool VerifyValue(int cf, long key, const ReadOptions& opts,
const SharedState& shared, const std::string& value_from_db, SharedState* shared, const std::string& value_from_db,
Status s, bool strict = false) const { Status s, bool strict = false) const {
if (shared->HasVerificationFailedYet()) {
return false;
}
// compare value_from_db with the value in the shared state // compare value_from_db with the value in the shared state
char value[100]; char value[100];
uint32_t value_base = shared.Get(cf, key); uint32_t value_base = shared->Get(cf, key);
if (value_base == SharedState::SENTINEL && !strict) { if (value_base == SharedState::SENTINEL && !strict) {
return; return true;
} }
if (s.ok()) { if (s.ok()) {
if (value_base == SharedState::SENTINEL) { if (value_base == SharedState::SENTINEL) {
VerificationAbort("Unexpected value found", cf, key); VerificationAbort(shared, "Unexpected value found", cf, key);
return false;
} }
size_t sz = GenerateValue(value_base, value, sizeof(value)); size_t sz = GenerateValue(value_base, value, sizeof(value));
if (value_from_db.length() != sz) { if (value_from_db.length() != sz) {
VerificationAbort("Length of value read is not equal", cf, key); VerificationAbort(shared, "Length of value read is not equal", cf, key);
return false;
} }
if (memcmp(value_from_db.data(), value, sz) != 0) { if (memcmp(value_from_db.data(), value, sz) != 0) {
VerificationAbort("Contents of value read don't match", cf, key); VerificationAbort(shared, "Contents of value read don't match", cf,
key);
return false;
} }
} else { } else {
if (value_base != SharedState::SENTINEL) { if (value_base != SharedState::SENTINEL) {
VerificationAbort("Value not found", cf, key); VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
return false;
} }
} }
return true;
} }
static void PrintKeyValue(int cf, uint32_t key, const char* value, static void PrintKeyValue(int cf, uint32_t key, const char* value,
@ -1693,6 +1728,9 @@ int main(int argc, char** argv) {
} }
rocksdb::StressTest stress; rocksdb::StressTest stress;
stress.Run(); if (stress.Run()) {
return 0; return 0;
} else {
return 1;
}
} }