mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 04:41:49 +00:00
4428c76181
Summary: In leveled compaction, L0->L1 trivial move will allow more than one file to be moved in one compaction. This would allow L0 files to be moved down faster when data is loaded in sequential order, making slowdown or stop condition harder to hit. Also seek L0->L1 trivial move when only some files qualify. 1. We always try to find L0->L1 trivial move from the oldest files. Keep including newer files, until adding a new file won't trigger a trivial move 2. Modify the trivial move condition so that this compaction would be tagged as trivial move. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10188 Test Plan: See throughput improvements with db_bench with fast fillseq benchmark and small L0 files: ./db_bench_l0_move --benchmarks=fillseq --compression_type=lz4 --write_buffer_size=5000000 --num=100000000 --value_size=1000 -level_compaction_dynamic_level_bytes The throughput improved by about 50%. Stalling still happens though. Reviewed By: jay-zhuang Differential Revision: D37224743 fbshipit-source-id: 8958d97f22e12bdfc14d2e85930f6fa0070e9659
1591 lines
55 KiB
C++
1591 lines
55 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "rocksdb/options.h"
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include <fcntl.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/types.h>
|
|
|
|
#include <cinttypes>
|
|
|
|
#include "db/db_impl/db_impl.h"
|
|
#include "db/db_test_util.h"
|
|
#include "db/log_format.h"
|
|
#include "db/version_set.h"
|
|
#include "file/filename.h"
|
|
#include "port/stack_trace.h"
|
|
#include "rocksdb/cache.h"
|
|
#include "rocksdb/convenience.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/table.h"
|
|
#include "rocksdb/utilities/transaction_db.h"
|
|
#include "rocksdb/write_batch.h"
|
|
#include "table/block_based/block_based_table_builder.h"
|
|
#include "table/meta_blocks.h"
|
|
#include "table/mock_table.h"
|
|
#include "test_util/testharness.h"
|
|
#include "test_util/testutil.h"
|
|
#include "util/cast_util.h"
|
|
#include "util/random.h"
|
|
#include "util/string_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
static constexpr int kValueSize = 1000;
|
|
namespace {
|
|
// A wrapper that allows injection of errors.
|
|
class ErrorEnv : public EnvWrapper {
|
|
public:
|
|
bool writable_file_error_;
|
|
int num_writable_file_errors_;
|
|
|
|
explicit ErrorEnv(Env* _target)
|
|
: EnvWrapper(_target),
|
|
writable_file_error_(false),
|
|
num_writable_file_errors_(0) {}
|
|
const char* Name() const override { return "ErrorEnv"; }
|
|
|
|
virtual Status NewWritableFile(const std::string& fname,
|
|
std::unique_ptr<WritableFile>* result,
|
|
const EnvOptions& soptions) override {
|
|
result->reset();
|
|
if (writable_file_error_) {
|
|
++num_writable_file_errors_;
|
|
return Status::IOError(fname, "fake error");
|
|
}
|
|
return target()->NewWritableFile(fname, result, soptions);
|
|
}
|
|
};
|
|
} // namespace
|
|
class CorruptionTest : public testing::Test {
|
|
public:
|
|
std::shared_ptr<Env> env_guard_;
|
|
ErrorEnv* env_;
|
|
std::string dbname_;
|
|
std::shared_ptr<Cache> tiny_cache_;
|
|
Options options_;
|
|
DB* db_;
|
|
|
|
CorruptionTest() {
|
|
// If LRU cache shard bit is smaller than 2 (or -1 which will automatically
|
|
// set it to 0), test SequenceNumberRecovery will fail, likely because of a
|
|
// bug in recovery code. Keep it 4 for now to make the test passes.
|
|
tiny_cache_ = NewLRUCache(100, 4);
|
|
Env* base_env = Env::Default();
|
|
EXPECT_OK(
|
|
test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
|
|
EXPECT_NE(base_env, nullptr);
|
|
env_ = new ErrorEnv(base_env);
|
|
options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
|
|
options_.env = env_;
|
|
dbname_ = test::PerThreadDBPath(env_, "corruption_test");
|
|
Status s = DestroyDB(dbname_, options_);
|
|
EXPECT_OK(s);
|
|
|
|
db_ = nullptr;
|
|
options_.create_if_missing = true;
|
|
BlockBasedTableOptions table_options;
|
|
table_options.block_size_deviation = 0; // make unit test pass for now
|
|
options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
Reopen();
|
|
options_.create_if_missing = false;
|
|
}
|
|
|
|
~CorruptionTest() override {
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->LoadDependency({});
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
delete db_;
|
|
db_ = nullptr;
|
|
if (getenv("KEEP_DB")) {
|
|
fprintf(stdout, "db is still at %s\n", dbname_.c_str());
|
|
} else {
|
|
Options opts;
|
|
opts.env = env_->target();
|
|
EXPECT_OK(DestroyDB(dbname_, opts));
|
|
}
|
|
delete env_;
|
|
}
|
|
|
|
void CloseDb() {
|
|
delete db_;
|
|
db_ = nullptr;
|
|
}
|
|
|
|
Status TryReopen(Options* options = nullptr) {
|
|
delete db_;
|
|
db_ = nullptr;
|
|
Options opt = (options ? *options : options_);
|
|
if (opt.env == Options().env) {
|
|
// If env is not overridden, replace it with ErrorEnv.
|
|
// Otherwise, the test already uses a non-default Env.
|
|
opt.env = env_;
|
|
}
|
|
opt.arena_block_size = 4096;
|
|
BlockBasedTableOptions table_options;
|
|
table_options.block_cache = tiny_cache_;
|
|
table_options.block_size_deviation = 0;
|
|
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
return DB::Open(opt, dbname_, &db_);
|
|
}
|
|
|
|
void Reopen(Options* options = nullptr) {
|
|
ASSERT_OK(TryReopen(options));
|
|
}
|
|
|
|
void RepairDB() {
|
|
delete db_;
|
|
db_ = nullptr;
|
|
ASSERT_OK(::ROCKSDB_NAMESPACE::RepairDB(dbname_, options_));
|
|
}
|
|
|
|
void Build(int n, int start, int flush_every) {
|
|
std::string key_space, value_space;
|
|
WriteBatch batch;
|
|
for (int i = 0; i < n; i++) {
|
|
if (flush_every != 0 && i != 0 && i % flush_every == 0) {
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
}
|
|
//if ((i % 100) == 0) fprintf(stderr, "@ %d of %d\n", i, n);
|
|
Slice key = Key(i + start, &key_space);
|
|
batch.Clear();
|
|
ASSERT_OK(batch.Put(key, Value(i + start, &value_space)));
|
|
ASSERT_OK(db_->Write(WriteOptions(), &batch));
|
|
}
|
|
}
|
|
|
|
void Build(int n, int flush_every = 0) { Build(n, 0, flush_every); }
|
|
|
|
void Check(int min_expected, int max_expected) {
|
|
uint64_t next_expected = 0;
|
|
uint64_t missed = 0;
|
|
int bad_keys = 0;
|
|
int bad_values = 0;
|
|
int correct = 0;
|
|
std::string value_space;
|
|
// Do not verify checksums. If we verify checksums then the
|
|
// db itself will raise errors because data is corrupted.
|
|
// Instead, we want the reads to be successful and this test
|
|
// will detect whether the appropriate corruptions have
|
|
// occurred.
|
|
Iterator* iter = db_->NewIterator(ReadOptions(false, true));
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
ASSERT_OK(iter->status());
|
|
uint64_t key;
|
|
Slice in(iter->key());
|
|
if (!ConsumeDecimalNumber(&in, &key) ||
|
|
!in.empty() ||
|
|
key < next_expected) {
|
|
bad_keys++;
|
|
continue;
|
|
}
|
|
missed += (key - next_expected);
|
|
next_expected = key + 1;
|
|
if (iter->value() != Value(static_cast<int>(key), &value_space)) {
|
|
bad_values++;
|
|
} else {
|
|
correct++;
|
|
}
|
|
}
|
|
iter->status().PermitUncheckedError();
|
|
delete iter;
|
|
|
|
fprintf(stderr,
|
|
"expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%llu\n",
|
|
min_expected, max_expected, correct, bad_keys, bad_values,
|
|
static_cast<unsigned long long>(missed));
|
|
ASSERT_LE(min_expected, correct);
|
|
ASSERT_GE(max_expected, correct);
|
|
}
|
|
|
|
void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
|
|
// Pick file to corrupt
|
|
std::vector<std::string> filenames;
|
|
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
|
|
uint64_t number;
|
|
FileType type;
|
|
std::string fname;
|
|
int picked_number = -1;
|
|
for (size_t i = 0; i < filenames.size(); i++) {
|
|
if (ParseFileName(filenames[i], &number, &type) &&
|
|
type == filetype &&
|
|
static_cast<int>(number) > picked_number) { // Pick latest file
|
|
fname = dbname_ + "/" + filenames[i];
|
|
picked_number = static_cast<int>(number);
|
|
}
|
|
}
|
|
ASSERT_TRUE(!fname.empty()) << filetype;
|
|
|
|
ASSERT_OK(test::CorruptFile(env_, fname, offset, bytes_to_corrupt));
|
|
}
|
|
|
|
// corrupts exactly one file at level `level`. if no file found at level,
|
|
// asserts
|
|
void CorruptTableFileAtLevel(int level, int offset, int bytes_to_corrupt) {
|
|
std::vector<LiveFileMetaData> metadata;
|
|
db_->GetLiveFilesMetaData(&metadata);
|
|
for (const auto& m : metadata) {
|
|
if (m.level == level) {
|
|
ASSERT_OK(test::CorruptFile(env_, dbname_ + "/" + m.name, offset,
|
|
bytes_to_corrupt));
|
|
return;
|
|
}
|
|
}
|
|
FAIL() << "no file found at level";
|
|
}
|
|
|
|
|
|
int Property(const std::string& name) {
|
|
std::string property;
|
|
int result;
|
|
if (db_->GetProperty(name, &property) &&
|
|
sscanf(property.c_str(), "%d", &result) == 1) {
|
|
return result;
|
|
} else {
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
// Return the ith key
|
|
Slice Key(int i, std::string* storage) {
|
|
char buf[100];
|
|
snprintf(buf, sizeof(buf), "%016d", i);
|
|
storage->assign(buf, strlen(buf));
|
|
return Slice(*storage);
|
|
}
|
|
|
|
// Return the value to associate with the specified key
|
|
Slice Value(int k, std::string* storage) {
|
|
if (k == 0) {
|
|
// Ugh. Random seed of 0 used to produce no entropy. This code
|
|
// preserves the implementation that was in place when all of the
|
|
// magic values in this file were picked.
|
|
*storage = std::string(kValueSize, ' ');
|
|
} else {
|
|
Random r(k);
|
|
*storage = r.RandomString(kValueSize);
|
|
}
|
|
return Slice(*storage);
|
|
}
|
|
|
|
void GetSortedWalFiles(std::vector<uint64_t>& file_nums) {
|
|
std::vector<std::string> tmp_files;
|
|
ASSERT_OK(env_->GetChildren(dbname_, &tmp_files));
|
|
FileType type = kWalFile;
|
|
for (const auto& file : tmp_files) {
|
|
uint64_t number = 0;
|
|
if (ParseFileName(file, &number, &type) && type == kWalFile) {
|
|
file_nums.push_back(number);
|
|
}
|
|
}
|
|
std::sort(file_nums.begin(), file_nums.end());
|
|
}
|
|
|
|
void CorruptFileWithTruncation(FileType file, uint64_t number,
|
|
uint64_t bytes_to_truncate = 0) {
|
|
std::string path;
|
|
switch (file) {
|
|
case FileType::kWalFile:
|
|
path = LogFileName(dbname_, number);
|
|
break;
|
|
// TODO: Add other file types as this method is being used for those file
|
|
// types.
|
|
default:
|
|
return;
|
|
}
|
|
uint64_t old_size = 0;
|
|
ASSERT_OK(env_->GetFileSize(path, &old_size));
|
|
assert(old_size > bytes_to_truncate);
|
|
uint64_t new_size = old_size - bytes_to_truncate;
|
|
// If bytes_to_truncate == 0, it will do full truncation.
|
|
if (bytes_to_truncate == 0) {
|
|
new_size = 0;
|
|
}
|
|
ASSERT_OK(test::TruncateFile(env_, path, new_size));
|
|
}
|
|
};
|
|
|
|
TEST_F(CorruptionTest, Recovery) {
|
|
Build(100);
|
|
Check(100, 100);
|
|
#ifdef OS_WIN
|
|
// On Wndows OS Disk cache does not behave properly
|
|
// We do not call FlushBuffers on every Flush. If we do not close
|
|
// the log file prior to the corruption we end up with the first
|
|
// block not corrupted but only the second. However, under the debugger
|
|
// things work just fine but never pass when running normally
|
|
// For that reason people may want to run with unbuffered I/O. That option
|
|
// is not available for WAL though.
|
|
CloseDb();
|
|
#endif
|
|
Corrupt(kWalFile, 19, 1); // WriteBatch tag for first record
|
|
Corrupt(kWalFile, log::kBlockSize + 1000, 1); // Somewhere in second block
|
|
ASSERT_TRUE(!TryReopen().ok());
|
|
options_.paranoid_checks = false;
|
|
Reopen(&options_);
|
|
|
|
// The 64 records in the first two log blocks are completely lost.
|
|
Check(36, 36);
|
|
}
|
|
|
|
TEST_F(CorruptionTest, PostPITRCorruptionWALsRetained) {
|
|
// Repro for bug where WALs following the point-in-time recovery were not
|
|
// retained leading to the next recovery failing.
|
|
CloseDb();
|
|
|
|
options_.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
|
|
|
const std::string test_cf_name = "test_cf";
|
|
std::vector<ColumnFamilyDescriptor> cf_descs;
|
|
cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
|
|
cf_descs.emplace_back(test_cf_name, ColumnFamilyOptions());
|
|
|
|
uint64_t log_num;
|
|
{
|
|
options_.create_missing_column_families = true;
|
|
std::vector<ColumnFamilyHandle*> cfhs;
|
|
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k", "v"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k", "v"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), cfhs[0], "k2", "v2"));
|
|
std::vector<uint64_t> file_nums;
|
|
GetSortedWalFiles(file_nums);
|
|
log_num = file_nums.back();
|
|
for (auto* cfh : cfhs) {
|
|
delete cfh;
|
|
}
|
|
CloseDb();
|
|
}
|
|
|
|
CorruptFileWithTruncation(FileType::kWalFile, log_num,
|
|
/*bytes_to_truncate=*/1);
|
|
|
|
{
|
|
// Recover "k" -> "v" for both CFs. "k2" -> "v2" is lost due to truncation.
|
|
options_.avoid_flush_during_recovery = true;
|
|
std::vector<ColumnFamilyHandle*> cfhs;
|
|
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
|
|
// Flush one but not both CFs and write some data so there's a seqno gap
|
|
// between the PITR corruption and the next DB session's first WAL.
|
|
ASSERT_OK(db_->Put(WriteOptions(), cfhs[1], "k2", "v2"));
|
|
ASSERT_OK(db_->Flush(FlushOptions(), cfhs[1]));
|
|
|
|
for (auto* cfh : cfhs) {
|
|
delete cfh;
|
|
}
|
|
CloseDb();
|
|
}
|
|
|
|
// With the bug, this DB open would remove the WALs following the PITR
|
|
// corruption. Then, the next recovery would fail.
|
|
for (int i = 0; i < 2; ++i) {
|
|
std::vector<ColumnFamilyHandle*> cfhs;
|
|
ASSERT_OK(DB::Open(options_, dbname_, cf_descs, &cfhs, &db_));
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
|
|
for (auto* cfh : cfhs) {
|
|
delete cfh;
|
|
}
|
|
CloseDb();
|
|
}
|
|
}
|
|
|
|
TEST_F(CorruptionTest, RecoverWriteError) {
|
|
env_->writable_file_error_ = true;
|
|
Status s = TryReopen();
|
|
ASSERT_TRUE(!s.ok());
|
|
}
|
|
|
|
TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
|
|
// Do enough writing to force minor compaction
|
|
env_->writable_file_error_ = true;
|
|
const int num =
|
|
static_cast<int>(3 + (Options().write_buffer_size / kValueSize));
|
|
std::string value_storage;
|
|
Status s;
|
|
bool failed = false;
|
|
for (int i = 0; i < num; i++) {
|
|
WriteBatch batch;
|
|
ASSERT_OK(batch.Put("a", Value(100, &value_storage)));
|
|
s = db_->Write(WriteOptions(), &batch);
|
|
if (!s.ok()) {
|
|
failed = true;
|
|
}
|
|
ASSERT_TRUE(!failed || !s.ok());
|
|
}
|
|
ASSERT_TRUE(!s.ok());
|
|
ASSERT_GE(env_->num_writable_file_errors_, 1);
|
|
env_->writable_file_error_ = false;
|
|
Reopen();
|
|
}
|
|
|
|
TEST_F(CorruptionTest, TableFile) {
|
|
Build(100);
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
|
|
ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
|
|
|
|
Corrupt(kTableFile, 100, 1);
|
|
Check(99, 99);
|
|
ASSERT_NOK(dbi->VerifyChecksum());
|
|
}
|
|
|
|
TEST_F(CorruptionTest, VerifyChecksumReadahead) {
|
|
Options options;
|
|
SpecialEnv senv(env_->target());
|
|
options.env = &senv;
|
|
// Disable block cache as we are going to check checksum for
|
|
// the same file twice and measure number of reads.
|
|
BlockBasedTableOptions table_options_no_bc;
|
|
table_options_no_bc.no_block_cache = true;
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options_no_bc));
|
|
|
|
Reopen(&options);
|
|
|
|
Build(10000);
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
|
|
ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
|
|
|
|
senv.count_random_reads_ = true;
|
|
senv.random_read_counter_.Reset();
|
|
ASSERT_OK(dbi->VerifyChecksum());
|
|
|
|
// Make sure the counter is enabled.
|
|
ASSERT_GT(senv.random_read_counter_.Read(), 0);
|
|
|
|
// The SST file is about 10MB. Default readahead size is 256KB.
|
|
// Give a conservative 20 reads for metadata blocks, The number
|
|
// of random reads should be within 10 MB / 256KB + 20 = 60.
|
|
ASSERT_LT(senv.random_read_counter_.Read(), 60);
|
|
|
|
senv.random_read_bytes_counter_ = 0;
|
|
ReadOptions ro;
|
|
ro.readahead_size = size_t{32 * 1024};
|
|
ASSERT_OK(dbi->VerifyChecksum(ro));
|
|
// The SST file is about 10MB. We set readahead size to 32KB.
|
|
// Give 0 to 20 reads for metadata blocks, and allow real read
|
|
// to range from 24KB to 48KB. The lower bound would be:
|
|
// 10MB / 48KB + 0 = 213
|
|
// The higher bound is
|
|
// 10MB / 24KB + 20 = 447.
|
|
ASSERT_GE(senv.random_read_counter_.Read(), 213);
|
|
ASSERT_LE(senv.random_read_counter_.Read(), 447);
|
|
|
|
// Test readahead shouldn't break mmap mode (where it should be
|
|
// disabled).
|
|
options.allow_mmap_reads = true;
|
|
Reopen(&options);
|
|
dbi = static_cast<DBImpl*>(db_);
|
|
ASSERT_OK(dbi->VerifyChecksum(ro));
|
|
|
|
CloseDb();
|
|
}
|
|
|
|
TEST_F(CorruptionTest, TableFileIndexData) {
|
|
Options options;
|
|
// very big, we'll trigger flushes manually
|
|
options.write_buffer_size = 100 * 1024 * 1024;
|
|
Reopen(&options);
|
|
// build 2 tables, flush at 5000
|
|
Build(10000, 5000);
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
|
|
// corrupt an index block of an entire file
|
|
Corrupt(kTableFile, -2000, 500);
|
|
options.paranoid_checks = false;
|
|
Reopen(&options);
|
|
dbi = static_cast_with_check<DBImpl>(db_);
|
|
// one full file may be readable, since only one was corrupted
|
|
// the other file should be fully non-readable, since index was corrupted
|
|
Check(0, 5000);
|
|
ASSERT_NOK(dbi->VerifyChecksum());
|
|
|
|
// In paranoid mode, the db cannot be opened due to the corrupted file.
|
|
ASSERT_TRUE(TryReopen().IsCorruption());
|
|
}
|
|
|
|
TEST_F(CorruptionTest, MissingDescriptor) {
|
|
Build(1000);
|
|
RepairDB();
|
|
Reopen();
|
|
Check(1000, 1000);
|
|
}
|
|
|
|
TEST_F(CorruptionTest, SequenceNumberRecovery) {
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v3"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v4"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v5"));
|
|
RepairDB();
|
|
Reopen();
|
|
std::string v;
|
|
ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
|
|
ASSERT_EQ("v5", v);
|
|
// Write something. If sequence number was not recovered properly,
|
|
// it will be hidden by an earlier write.
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "v6"));
|
|
ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
|
|
ASSERT_EQ("v6", v);
|
|
Reopen();
|
|
ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
|
|
ASSERT_EQ("v6", v);
|
|
}
|
|
|
|
TEST_F(CorruptionTest, CorruptedDescriptor) {
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo", "hello"));
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
CompactRangeOptions cro;
|
|
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
|
ASSERT_OK(
|
|
dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
|
|
|
|
Corrupt(kDescriptorFile, 0, 1000);
|
|
Status s = TryReopen();
|
|
ASSERT_TRUE(!s.ok());
|
|
|
|
RepairDB();
|
|
Reopen();
|
|
std::string v;
|
|
ASSERT_OK(db_->Get(ReadOptions(), "foo", &v));
|
|
ASSERT_EQ("hello", v);
|
|
}
|
|
|
|
TEST_F(CorruptionTest, CompactionInputError) {
|
|
Options options;
|
|
options.env = env_;
|
|
Reopen(&options);
|
|
Build(10);
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr));
|
|
ASSERT_OK(dbi->TEST_CompactRange(1, nullptr, nullptr));
|
|
ASSERT_EQ(1, Property("rocksdb.num-files-at-level2"));
|
|
|
|
Corrupt(kTableFile, 100, 1);
|
|
Check(9, 9);
|
|
ASSERT_NOK(dbi->VerifyChecksum());
|
|
|
|
// Force compactions by writing lots of values
|
|
Build(10000);
|
|
Check(10000, 10000);
|
|
ASSERT_NOK(dbi->VerifyChecksum());
|
|
}
|
|
|
|
TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.paranoid_checks = true;
|
|
options.write_buffer_size = 131072;
|
|
options.max_write_buffer_number = 2;
|
|
Reopen(&options);
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
|
|
// Fill levels >= 1
|
|
for (int level = 1; level < dbi->NumberLevels(); level++) {
|
|
ASSERT_OK(dbi->Put(WriteOptions(), "", "begin"));
|
|
ASSERT_OK(dbi->Put(WriteOptions(), "~", "end"));
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
for (int comp_level = 0; comp_level < dbi->NumberLevels() - level;
|
|
++comp_level) {
|
|
ASSERT_OK(dbi->TEST_CompactRange(comp_level, nullptr, nullptr));
|
|
}
|
|
}
|
|
|
|
Reopen(&options);
|
|
|
|
dbi = static_cast_with_check<DBImpl>(db_);
|
|
Build(10);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
ASSERT_OK(dbi->TEST_WaitForCompact());
|
|
ASSERT_EQ(1, Property("rocksdb.num-files-at-level0"));
|
|
|
|
CorruptTableFileAtLevel(0, 100, 1);
|
|
Check(9, 9);
|
|
ASSERT_NOK(dbi->VerifyChecksum());
|
|
|
|
// Write must eventually fail because of corrupted table
|
|
Status s;
|
|
std::string tmp1, tmp2;
|
|
bool failed = false;
|
|
for (int i = 0; i < 10000; i++) {
|
|
s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
|
|
if (!s.ok()) {
|
|
failed = true;
|
|
}
|
|
// if one write failed, every subsequent write must fail, too
|
|
ASSERT_TRUE(!failed || !s.ok()) << "write did not fail in a corrupted db";
|
|
}
|
|
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
|
|
}
|
|
|
|
TEST_F(CorruptionTest, UnrelatedKeys) {
|
|
Build(10);
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
Corrupt(kTableFile, 100, 1);
|
|
ASSERT_NOK(dbi->VerifyChecksum());
|
|
|
|
std::string tmp1, tmp2;
|
|
ASSERT_OK(db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2)));
|
|
std::string v;
|
|
ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
|
|
ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
ASSERT_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
|
|
ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
|
|
}
|
|
|
|
TEST_F(CorruptionTest, RangeDeletionCorrupted) {
|
|
ASSERT_OK(
|
|
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "b"));
|
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
|
std::vector<LiveFileMetaData> metadata;
|
|
db_->GetLiveFilesMetaData(&metadata);
|
|
ASSERT_EQ(static_cast<size_t>(1), metadata.size());
|
|
std::string filename = dbname_ + metadata[0].name;
|
|
|
|
FileOptions file_opts;
|
|
const auto& fs = options_.env->GetFileSystem();
|
|
std::unique_ptr<RandomAccessFileReader> file_reader;
|
|
ASSERT_OK(RandomAccessFileReader::Create(fs, filename, file_opts,
|
|
&file_reader, nullptr));
|
|
|
|
uint64_t file_size;
|
|
ASSERT_OK(
|
|
fs->GetFileSize(filename, file_opts.io_options, &file_size, nullptr));
|
|
|
|
BlockHandle range_del_handle;
|
|
ASSERT_OK(FindMetaBlockInFile(
|
|
file_reader.get(), file_size, kBlockBasedTableMagicNumber,
|
|
ImmutableOptions(options_), kRangeDelBlockName, &range_del_handle));
|
|
|
|
ASSERT_OK(TryReopen());
|
|
ASSERT_OK(test::CorruptFile(env_, filename,
|
|
static_cast<int>(range_del_handle.offset()), 1));
|
|
ASSERT_TRUE(TryReopen().IsCorruption());
|
|
}
|
|
|
|
TEST_F(CorruptionTest, FileSystemStateCorrupted) {
|
|
for (int iter = 0; iter < 2; ++iter) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.paranoid_checks = true;
|
|
options.create_if_missing = true;
|
|
Reopen(&options);
|
|
Build(10);
|
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
std::vector<LiveFileMetaData> metadata;
|
|
dbi->GetLiveFilesMetaData(&metadata);
|
|
ASSERT_GT(metadata.size(), 0);
|
|
std::string filename = dbname_ + metadata[0].name;
|
|
|
|
delete db_;
|
|
db_ = nullptr;
|
|
|
|
if (iter == 0) { // corrupt file size
|
|
std::unique_ptr<WritableFile> file;
|
|
ASSERT_OK(env_->NewWritableFile(filename, &file, EnvOptions()));
|
|
ASSERT_OK(file->Append(Slice("corrupted sst")));
|
|
file.reset();
|
|
Status x = TryReopen(&options);
|
|
ASSERT_TRUE(x.IsCorruption());
|
|
} else { // delete the file
|
|
ASSERT_OK(env_->DeleteFile(filename));
|
|
Status x = TryReopen(&options);
|
|
ASSERT_TRUE(x.IsCorruption());
|
|
}
|
|
|
|
ASSERT_OK(DestroyDB(dbname_, options_));
|
|
}
|
|
}
|
|
|
|
static const auto& corruption_modes = {
|
|
mock::MockTableFactory::kCorruptNone, mock::MockTableFactory::kCorruptKey,
|
|
mock::MockTableFactory::kCorruptValue,
|
|
mock::MockTableFactory::kCorruptReorderKey};
|
|
|
|
TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.check_flush_compaction_key_order = false;
|
|
options.paranoid_file_checks = true;
|
|
options.create_if_missing = true;
|
|
Status s;
|
|
for (const auto& mode : corruption_modes) {
|
|
delete db_;
|
|
db_ = nullptr;
|
|
s = DestroyDB(dbname_, options);
|
|
ASSERT_OK(s);
|
|
std::shared_ptr<mock::MockTableFactory> mock =
|
|
std::make_shared<mock::MockTableFactory>();
|
|
options.table_factory = mock;
|
|
mock->SetCorruptionMode(mode);
|
|
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
Build(10);
|
|
s = db_->Flush(FlushOptions());
|
|
if (mode == mock::MockTableFactory::kCorruptNone) {
|
|
ASSERT_OK(s);
|
|
} else {
|
|
ASSERT_NOK(s);
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.paranoid_file_checks = true;
|
|
options.create_if_missing = true;
|
|
options.check_flush_compaction_key_order = false;
|
|
Status s;
|
|
for (const auto& mode : corruption_modes) {
|
|
delete db_;
|
|
db_ = nullptr;
|
|
s = DestroyDB(dbname_, options);
|
|
std::shared_ptr<mock::MockTableFactory> mock =
|
|
std::make_shared<mock::MockTableFactory>();
|
|
options.table_factory = mock;
|
|
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
Build(100, 2);
|
|
// ASSERT_OK(db_->Flush(FlushOptions()));
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
mock->SetCorruptionMode(mode);
|
|
CompactRangeOptions cro;
|
|
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
|
s = dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr);
|
|
if (mode == mock::MockTableFactory::kCorruptNone) {
|
|
ASSERT_OK(s);
|
|
} else {
|
|
ASSERT_NOK(s);
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeFirst) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.check_flush_compaction_key_order = false;
|
|
options.paranoid_file_checks = true;
|
|
options.create_if_missing = true;
|
|
for (bool do_flush : {true, false}) {
|
|
delete db_;
|
|
db_ = nullptr;
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
|
std::string start, end;
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(3, &start), Key(7, &end)));
|
|
auto snap = db_->GetSnapshot();
|
|
ASSERT_NE(snap, nullptr);
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(8, &start), Key(9, &end)));
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(2, &start), Key(5, &end)));
|
|
Build(10);
|
|
if (do_flush) {
|
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
|
} else {
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
CompactRangeOptions cro;
|
|
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
|
ASSERT_OK(
|
|
dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
|
|
}
|
|
db_->ReleaseSnapshot(snap);
|
|
}
|
|
}
|
|
|
|
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRange) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.check_flush_compaction_key_order = false;
|
|
options.paranoid_file_checks = true;
|
|
options.create_if_missing = true;
|
|
for (bool do_flush : {true, false}) {
|
|
delete db_;
|
|
db_ = nullptr;
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
Build(10, 0, 0);
|
|
std::string start, end;
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(5, &start), Key(15, &end)));
|
|
auto snap = db_->GetSnapshot();
|
|
ASSERT_NE(snap, nullptr);
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(8, &start), Key(9, &end)));
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(12, &start), Key(17, &end)));
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(2, &start), Key(4, &end)));
|
|
Build(10, 10, 0);
|
|
if (do_flush) {
|
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
|
} else {
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
CompactRangeOptions cro;
|
|
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
|
ASSERT_OK(
|
|
dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
|
|
}
|
|
db_->ReleaseSnapshot(snap);
|
|
}
|
|
}
|
|
|
|
TEST_F(CorruptionTest, ParanoidFileChecksWithDeleteRangeLast) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.check_flush_compaction_key_order = false;
|
|
options.paranoid_file_checks = true;
|
|
options.create_if_missing = true;
|
|
for (bool do_flush : {true, false}) {
|
|
delete db_;
|
|
db_ = nullptr;
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
std::string start, end;
|
|
Build(10);
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(3, &start), Key(7, &end)));
|
|
auto snap = db_->GetSnapshot();
|
|
ASSERT_NE(snap, nullptr);
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(6, &start), Key(8, &end)));
|
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
|
Key(2, &start), Key(5, &end)));
|
|
if (do_flush) {
|
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
|
} else {
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
CompactRangeOptions cro;
|
|
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
|
ASSERT_OK(
|
|
dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
|
|
}
|
|
db_->ReleaseSnapshot(snap);
|
|
}
|
|
}
|
|
|
|
TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
options.allow_data_in_errors = true;
|
|
auto mode = mock::MockTableFactory::kCorruptKey;
|
|
delete db_;
|
|
db_ = nullptr;
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
|
|
std::shared_ptr<mock::MockTableFactory> mock =
|
|
std::make_shared<mock::MockTableFactory>();
|
|
mock->SetCorruptionMode(mode);
|
|
options.table_factory = mock;
|
|
|
|
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
Build(100, 2);
|
|
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
CompactRangeOptions cro;
|
|
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
|
Status s =
|
|
dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr);
|
|
ASSERT_NOK(s);
|
|
ASSERT_TRUE(s.IsCorruption());
|
|
}
|
|
|
|
TEST_F(CorruptionTest, CompactionKeyOrderCheck) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.paranoid_file_checks = false;
|
|
options.create_if_missing = true;
|
|
options.check_flush_compaction_key_order = false;
|
|
delete db_;
|
|
db_ = nullptr;
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
std::shared_ptr<mock::MockTableFactory> mock =
|
|
std::make_shared<mock::MockTableFactory>();
|
|
options.table_factory = mock;
|
|
ASSERT_OK(DB::Open(options, dbname_, &db_));
|
|
assert(db_ != nullptr); // suppress false clang-analyze report
|
|
mock->SetCorruptionMode(mock::MockTableFactory::kCorruptReorderKey);
|
|
Build(100, 2);
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
|
|
mock->SetCorruptionMode(mock::MockTableFactory::kCorruptNone);
|
|
ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}}));
|
|
CompactRangeOptions cro;
|
|
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
|
ASSERT_NOK(
|
|
dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
|
|
}
|
|
|
|
TEST_F(CorruptionTest, FlushKeyOrderCheck) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.paranoid_file_checks = false;
|
|
options.create_if_missing = true;
|
|
ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}}));
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo1", "v1"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo3", "v1"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1"));
|
|
|
|
int cnt = 0;
|
|
// Generate some out of order keys from the memtable
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"MemTableIterator::Next:0", [&](void* arg) {
|
|
MemTableRep::Iterator* mem_iter =
|
|
static_cast<MemTableRep::Iterator*>(arg);
|
|
if (++cnt == 3) {
|
|
mem_iter->Prev();
|
|
mem_iter->Prev();
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
Status s = static_cast_with_check<DBImpl>(db_)->TEST_FlushMemTable();
|
|
ASSERT_NOK(s);
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
}
|
|
|
|
TEST_F(CorruptionTest, DisableKeyOrderCheck) {
|
|
ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "false"}}));
|
|
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
|
|
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"OutputValidator::Add:order_check",
|
|
[&](void* /*arg*/) { ASSERT_TRUE(false); });
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo1", "v1"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo3", "v1"));
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1"));
|
|
ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1"));
|
|
ASSERT_OK(dbi->TEST_FlushMemTable());
|
|
CompactRangeOptions cro;
|
|
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
|
|
ASSERT_OK(
|
|
dbi->CompactRange(cro, dbi->DefaultColumnFamily(), nullptr, nullptr));
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
}
|
|
|
|
TEST_F(CorruptionTest, VerifyWholeTableChecksum) {
|
|
CloseDb();
|
|
Options options;
|
|
options.env = env_;
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
options.create_if_missing = true;
|
|
options.file_checksum_gen_factory =
|
|
ROCKSDB_NAMESPACE::GetFileChecksumGenCrc32cFactory();
|
|
Reopen(&options);
|
|
|
|
Build(10, 5);
|
|
|
|
ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
|
|
CloseDb();
|
|
|
|
// Corrupt the first byte of each table file, this must be data block.
|
|
Corrupt(kTableFile, 0, 1);
|
|
|
|
ASSERT_OK(TryReopen(&options));
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
int count{0};
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::VerifyFullFileChecksum:mismatch", [&](void* arg) {
|
|
auto* s = reinterpret_cast<Status*>(arg);
|
|
ASSERT_NE(s, nullptr);
|
|
++count;
|
|
ASSERT_NOK(*s);
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsCorruption());
|
|
ASSERT_EQ(1, count);
|
|
}
|
|
|
|
class CrashDuringRecoveryWithCorruptionTest
|
|
: public CorruptionTest,
|
|
public testing::WithParamInterface<std::tuple<bool, bool>> {
|
|
public:
|
|
explicit CrashDuringRecoveryWithCorruptionTest()
|
|
: CorruptionTest(),
|
|
avoid_flush_during_recovery_(std::get<0>(GetParam())),
|
|
track_and_verify_wals_in_manifest_(std::get<1>(GetParam())) {}
|
|
|
|
protected:
|
|
const bool avoid_flush_during_recovery_;
|
|
const bool track_and_verify_wals_in_manifest_;
|
|
};
|
|
|
|
INSTANTIATE_TEST_CASE_P(CorruptionTest, CrashDuringRecoveryWithCorruptionTest,
|
|
::testing::Values(std::make_tuple(true, false),
|
|
std::make_tuple(false, false),
|
|
std::make_tuple(true, true),
|
|
std::make_tuple(false, true)));
|
|
|
|
// In case of non-TransactionDB with avoid_flush_during_recovery = true, RocksDB
|
|
// won't flush the data from WAL to L0 for all column families if possible. As a
|
|
// result, not all column families can increase their log_numbers, and
|
|
// min_log_number_to_keep won't change.
|
|
// It may prematurely persist a new MANIFEST even before we can declare the DB
|
|
// is in consistent state after recovery (this is when the new WAL is synced)
|
|
// and advances log_numbers for some column families.
|
|
//
|
|
// If there is power failure before we sync the new WAL, we will end up in
|
|
// a situation in which after persisting the MANIFEST, RocksDB will see some
|
|
// column families' log_numbers larger than the corrupted wal, and
|
|
// "Column family inconsistency: SST file contains data beyond the point of
|
|
// corruption" error will be hit, causing recovery to fail.
|
|
//
|
|
// After adding the fix, only after new WAL is synced, RocksDB persist a new
|
|
// MANIFEST with column families to ensure RocksDB is in consistent state.
|
|
// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
|
|
// synced immediately afterwards. The sequence number of the sentinel
|
|
// WriteBatch will be the next sequence number immediately after the largest
|
|
// sequence number recovered from previous WALs and MANIFEST because of which DB
|
|
// will be in consistent state.
|
|
// If a future recovery starts from the new MANIFEST, then it means the new WAL
|
|
// is successfully synced. Due to the sentinel empty write batch at the
|
|
// beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
|
|
// If future recovery starts from the old MANIFEST, it means the writing the new
|
|
// MANIFEST failed. It won't have the "SST ahead of WAL" error.
|
|
//
|
|
// The combination of corrupting a WAL and injecting an error during subsequent
|
|
// re-open exposes the bug of prematurely persisting a new MANIFEST with
|
|
// advanced ColumnFamilyData::log_number.
|
|
TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecovery) {
|
|
CloseDb();
|
|
Options options;
|
|
options.track_and_verify_wals_in_manifest =
|
|
track_and_verify_wals_in_manifest_;
|
|
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
|
options.avoid_flush_during_recovery = false;
|
|
options.env = env_;
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
options.create_if_missing = true;
|
|
options.max_write_buffer_number = 8;
|
|
|
|
Reopen(&options);
|
|
Status s;
|
|
const std::string test_cf_name = "test_cf";
|
|
ColumnFamilyHandle* cfh = nullptr;
|
|
s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
|
|
ASSERT_OK(s);
|
|
delete cfh;
|
|
CloseDb();
|
|
|
|
std::vector<ColumnFamilyDescriptor> cf_descs;
|
|
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
|
|
cf_descs.emplace_back(test_cf_name, options);
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
|
|
// 1. Open and populate the DB. Write and flush default_cf several times to
|
|
// advance wal number so that some column families have advanced log_number
|
|
// while other don't.
|
|
{
|
|
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
|
|
auto* dbimpl = static_cast_with_check<DBImpl>(db_);
|
|
assert(dbimpl);
|
|
|
|
// Write one key to test_cf.
|
|
ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
|
|
ASSERT_OK(db_->Flush(FlushOptions(), handles[1]));
|
|
|
|
// Write to default_cf and flush this cf several times to advance wal
|
|
// number. TEST_SwitchMemtable makes sure WALs are not synced and test can
|
|
// corrupt un-sync WAL.
|
|
for (int i = 0; i < 2; ++i) {
|
|
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
|
|
"value" + std::to_string(i)));
|
|
ASSERT_OK(dbimpl->TEST_SwitchMemtable());
|
|
}
|
|
|
|
for (auto* h : handles) {
|
|
delete h;
|
|
}
|
|
handles.clear();
|
|
CloseDb();
|
|
}
|
|
|
|
// 2. Corrupt second last un-syned wal file to emulate power reset which
|
|
// caused the DB to lose the un-synced WAL.
|
|
{
|
|
std::vector<uint64_t> file_nums;
|
|
GetSortedWalFiles(file_nums);
|
|
size_t size = file_nums.size();
|
|
assert(size >= 2);
|
|
uint64_t log_num = file_nums[size - 2];
|
|
CorruptFileWithTruncation(FileType::kWalFile, log_num,
|
|
/*bytes_to_truncate=*/8);
|
|
}
|
|
|
|
// 3. After first crash reopen the DB which contains corrupted WAL. Default
|
|
// family has higher log number than corrupted wal number.
|
|
//
|
|
// Case1: If avoid_flush_during_recovery = true, RocksDB won't flush the data
|
|
// from WAL to L0 for all column families (test_cf_name in this case). As a
|
|
// result, not all column families can increase their log_numbers, and
|
|
// min_log_number_to_keep won't change.
|
|
//
|
|
// Case2: If avoid_flush_during_recovery = false, all column families have
|
|
// flushed their data from WAL to L0 during recovery, and none of them will
|
|
// ever need to read the WALs again.
|
|
|
|
// 4. Fault is injected to fail the recovery.
|
|
{
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
|
|
auto* tmp_s = reinterpret_cast<Status*>(arg);
|
|
assert(tmp_s);
|
|
*tmp_s = Status::IOError("Injected");
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
handles.clear();
|
|
options.avoid_flush_during_recovery = true;
|
|
s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
|
|
ASSERT_TRUE(s.IsIOError());
|
|
ASSERT_EQ("IO error: Injected", s.ToString());
|
|
for (auto* h : handles) {
|
|
delete h;
|
|
}
|
|
CloseDb();
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
}
|
|
|
|
// 5. After second crash reopen the db with second corruption. Default family
|
|
// has higher log number than corrupted wal number.
|
|
//
|
|
// Case1: If avoid_flush_during_recovery = true, we persist a new
|
|
// MANIFEST with advanced log_numbers for some column families only after
|
|
// syncing the WAL. So during second crash, RocksDB will skip the corrupted
|
|
// WAL files as they have been moved to different folder. Since newly synced
|
|
// WAL file's sequence number (sentinel WriteBatch) will be the next
|
|
// sequence number immediately after the largest sequence number recovered
|
|
// from previous WALs and MANIFEST, db will be in consistent state and opens
|
|
// successfully.
|
|
//
|
|
// Case2: If avoid_flush_during_recovery = false, the corrupted WAL is below
|
|
// this number. So during a second crash after persisting the new MANIFEST,
|
|
// RocksDB will skip the corrupted WAL(s) because they are all below this
|
|
// bound. Therefore, we won't hit the "column family inconsistency" error
|
|
// message.
|
|
{
|
|
options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
|
|
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
|
|
|
|
// Verify that data is not lost.
|
|
{
|
|
std::string v;
|
|
ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
|
|
ASSERT_EQ("dontcare", v);
|
|
|
|
v.clear();
|
|
ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(0), &v));
|
|
ASSERT_EQ("value" + std::to_string(0), v);
|
|
|
|
// Since it's corrupting second last wal, below key is not found.
|
|
v.clear();
|
|
ASSERT_EQ(db_->Get(ReadOptions(), "key" + std::to_string(1), &v),
|
|
Status::NotFound());
|
|
}
|
|
|
|
for (auto* h : handles) {
|
|
delete h;
|
|
}
|
|
handles.clear();
|
|
CloseDb();
|
|
}
|
|
}
|
|
|
|
// In case of TransactionDB, it enables two-phase-commit. The prepare section of
|
|
// an uncommitted transaction always need to be kept. Even if we perform flush
|
|
// during recovery, we may still need to hold an old WAL. The
|
|
// min_log_number_to_keep won't change, and "Column family inconsistency: SST
|
|
// file contains data beyond the point of corruption" error will be hit, causing
|
|
// recovery to fail.
|
|
//
|
|
// After adding the fix, only after new WAL is synced, RocksDB persist a new
|
|
// MANIFEST with column families to ensure RocksDB is in consistent state.
|
|
// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
|
|
// synced immediately afterwards. The sequence number of the sentinel
|
|
// WriteBatch will be the next sequence number immediately after the largest
|
|
// sequence number recovered from previous WALs and MANIFEST because of which DB
|
|
// will be in consistent state.
|
|
// If a future recovery starts from the new MANIFEST, then it means the new WAL
|
|
// is successfully synced. Due to the sentinel empty write batch at the
|
|
// beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
|
|
// If future recovery starts from the old MANIFEST, it means the writing the new
|
|
// MANIFEST failed. It won't have the "SST ahead of WAL" error.
|
|
//
|
|
// The combination of corrupting a WAL and injecting an error during subsequent
|
|
// re-open exposes the bug of prematurely persisting a new MANIFEST with
|
|
// advanced ColumnFamilyData::log_number.
|
|
TEST_P(CrashDuringRecoveryWithCorruptionTest, TxnDbCrashDuringRecovery) {
|
|
CloseDb();
|
|
Options options;
|
|
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
|
options.track_and_verify_wals_in_manifest =
|
|
track_and_verify_wals_in_manifest_;
|
|
options.avoid_flush_during_recovery = false;
|
|
options.env = env_;
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
options.create_if_missing = true;
|
|
options.max_write_buffer_number = 3;
|
|
Reopen(&options);
|
|
|
|
// Create cf test_cf_name.
|
|
ColumnFamilyHandle* cfh = nullptr;
|
|
const std::string test_cf_name = "test_cf";
|
|
Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
|
|
ASSERT_OK(s);
|
|
delete cfh;
|
|
CloseDb();
|
|
|
|
std::vector<ColumnFamilyDescriptor> cf_descs;
|
|
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
|
|
cf_descs.emplace_back(test_cf_name, options);
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
|
|
TransactionDB* txn_db = nullptr;
|
|
TransactionDBOptions txn_db_opts;
|
|
|
|
// 1. Open and populate the DB. Write and flush default_cf several times to
|
|
// advance wal number so that some column families have advanced log_number
|
|
// while other don't.
|
|
{
|
|
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
|
|
&handles, &txn_db));
|
|
|
|
auto* txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
|
|
// Put cf1
|
|
ASSERT_OK(txn->Put(handles[1], "foo", "value"));
|
|
ASSERT_OK(txn->SetName("txn0"));
|
|
ASSERT_OK(txn->Prepare());
|
|
ASSERT_OK(txn_db->Flush(FlushOptions()));
|
|
|
|
delete txn;
|
|
txn = nullptr;
|
|
|
|
auto* dbimpl = static_cast_with_check<DBImpl>(txn_db->GetRootDB());
|
|
assert(dbimpl);
|
|
|
|
// Put and flush cf0
|
|
for (int i = 0; i < 2; ++i) {
|
|
ASSERT_OK(txn_db->Put(WriteOptions(), "key" + std::to_string(i),
|
|
"value" + std::to_string(i)));
|
|
ASSERT_OK(dbimpl->TEST_SwitchMemtable());
|
|
}
|
|
|
|
// Put cf1
|
|
txn = txn_db->BeginTransaction(WriteOptions(), TransactionOptions());
|
|
ASSERT_OK(txn->Put(handles[1], "foo1", "value1"));
|
|
ASSERT_OK(txn->Commit());
|
|
|
|
delete txn;
|
|
txn = nullptr;
|
|
|
|
for (auto* h : handles) {
|
|
delete h;
|
|
}
|
|
handles.clear();
|
|
delete txn_db;
|
|
}
|
|
|
|
// 2. Corrupt second last wal to emulate power reset which caused the DB to
|
|
// lose the un-synced WAL.
|
|
{
|
|
std::vector<uint64_t> file_nums;
|
|
GetSortedWalFiles(file_nums);
|
|
size_t size = file_nums.size();
|
|
assert(size >= 2);
|
|
uint64_t log_num = file_nums[size - 2];
|
|
CorruptFileWithTruncation(FileType::kWalFile, log_num,
|
|
/*bytes_to_truncate=*/8);
|
|
}
|
|
|
|
// 3. After first crash reopen the DB which contains corrupted WAL. Default
|
|
// family has higher log number than corrupted wal number. There may be old
|
|
// WAL files that it must not delete because they can contain data of
|
|
// uncommitted transactions. As a result, min_log_number_to_keep won't change.
|
|
|
|
{
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::Open::BeforeSyncWAL", [&](void* arg) {
|
|
auto* tmp_s = reinterpret_cast<Status*>(arg);
|
|
assert(tmp_s);
|
|
*tmp_s = Status::IOError("Injected");
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
handles.clear();
|
|
s = TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs, &handles,
|
|
&txn_db);
|
|
ASSERT_TRUE(s.IsIOError());
|
|
ASSERT_EQ("IO error: Injected", s.ToString());
|
|
for (auto* h : handles) {
|
|
delete h;
|
|
}
|
|
CloseDb();
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
}
|
|
|
|
// 4. Corrupt max_wal_num.
|
|
{
|
|
std::vector<uint64_t> file_nums;
|
|
GetSortedWalFiles(file_nums);
|
|
size_t size = file_nums.size();
|
|
uint64_t log_num = file_nums[size - 1];
|
|
CorruptFileWithTruncation(FileType::kWalFile, log_num);
|
|
}
|
|
|
|
// 5. After second crash reopen the db with second corruption. Default family
|
|
// has higher log number than corrupted wal number.
|
|
// We persist a new MANIFEST with advanced log_numbers for some column
|
|
// families only after syncing the WAL. So during second crash, RocksDB will
|
|
// skip the corrupted WAL files as they have been moved to different folder.
|
|
// Since newly synced WAL file's sequence number (sentinel WriteBatch) will be
|
|
// the next sequence number immediately after the largest sequence number
|
|
// recovered from previous WALs and MANIFEST, db will be in consistent state
|
|
// and opens successfully.
|
|
{
|
|
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, cf_descs,
|
|
&handles, &txn_db));
|
|
|
|
// Verify that data is not lost.
|
|
{
|
|
std::string v;
|
|
// Key not visible since it's not committed.
|
|
ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo", &v),
|
|
Status::NotFound());
|
|
|
|
v.clear();
|
|
ASSERT_OK(txn_db->Get(ReadOptions(), "key" + std::to_string(0), &v));
|
|
ASSERT_EQ("value" + std::to_string(0), v);
|
|
|
|
// Last WAL is corrupted which contains two keys below.
|
|
v.clear();
|
|
ASSERT_EQ(txn_db->Get(ReadOptions(), "key" + std::to_string(1), &v),
|
|
Status::NotFound());
|
|
v.clear();
|
|
ASSERT_EQ(txn_db->Get(ReadOptions(), handles[1], "foo1", &v),
|
|
Status::NotFound());
|
|
}
|
|
|
|
for (auto* h : handles) {
|
|
delete h;
|
|
}
|
|
delete txn_db;
|
|
}
|
|
}
|
|
|
|
// This test is similar to
|
|
// CrashDuringRecoveryWithCorruptionTest.CrashDuringRecovery except it calls
|
|
// flush and corrupts Last WAL. It calls flush to sync some of the WALs and
|
|
// remaining are unsyned one of which is then corrupted to simulate crash.
|
|
//
|
|
// In case of non-TransactionDB with avoid_flush_during_recovery = true, RocksDB
|
|
// won't flush the data from WAL to L0 for all column families if possible. As a
|
|
// result, not all column families can increase their log_numbers, and
|
|
// min_log_number_to_keep won't change.
|
|
// It may prematurely persist a new MANIFEST even before we can declare the DB
|
|
// is in consistent state after recovery (this is when the new WAL is synced)
|
|
// and advances log_numbers for some column families.
|
|
//
|
|
// If there is power failure before we sync the new WAL, we will end up in
|
|
// a situation in which after persisting the MANIFEST, RocksDB will see some
|
|
// column families' log_numbers larger than the corrupted wal, and
|
|
// "Column family inconsistency: SST file contains data beyond the point of
|
|
// corruption" error will be hit, causing recovery to fail.
|
|
//
|
|
// After adding the fix, only after new WAL is synced, RocksDB persist a new
|
|
// MANIFEST with column families to ensure RocksDB is in consistent state.
|
|
// RocksDB writes an empty WriteBatch as a sentinel to the new WAL which is
|
|
// synced immediately afterwards. The sequence number of the sentinel
|
|
// WriteBatch will be the next sequence number immediately after the largest
|
|
// sequence number recovered from previous WALs and MANIFEST because of which DB
|
|
// will be in consistent state.
|
|
// If a future recovery starts from the new MANIFEST, then it means the new WAL
|
|
// is successfully synced. Due to the sentinel empty write batch at the
|
|
// beginning, kPointInTimeRecovery of WAL is guaranteed to go after this point.
|
|
// If future recovery starts from the old MANIFEST, it means the writing the new
|
|
// MANIFEST failed. It won't have the "SST ahead of WAL" error.
|
|
|
|
// The combination of corrupting a WAL and injecting an error during subsequent
|
|
// re-open exposes the bug of prematurely persisting a new MANIFEST with
|
|
// advanced ColumnFamilyData::log_number.
|
|
TEST_P(CrashDuringRecoveryWithCorruptionTest, CrashDuringRecoveryWithFlush) {
|
|
CloseDb();
|
|
Options options;
|
|
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
|
|
options.avoid_flush_during_recovery = false;
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
Reopen(&options);
|
|
|
|
ColumnFamilyHandle* cfh = nullptr;
|
|
const std::string test_cf_name = "test_cf";
|
|
Status s = db_->CreateColumnFamily(options, test_cf_name, &cfh);
|
|
ASSERT_OK(s);
|
|
delete cfh;
|
|
|
|
CloseDb();
|
|
|
|
std::vector<ColumnFamilyDescriptor> cf_descs;
|
|
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
|
|
cf_descs.emplace_back(test_cf_name, options);
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
|
|
{
|
|
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
|
|
|
|
// Write one key to test_cf.
|
|
ASSERT_OK(db_->Put(WriteOptions(), handles[1], "old_key", "dontcare"));
|
|
|
|
// Write to default_cf and flush this cf several times to advance wal
|
|
// number.
|
|
for (int i = 0; i < 2; ++i) {
|
|
ASSERT_OK(db_->Put(WriteOptions(), "key" + std::to_string(i),
|
|
"value" + std::to_string(i)));
|
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
|
}
|
|
|
|
ASSERT_OK(db_->Put(WriteOptions(), handles[1], "dontcare", "dontcare"));
|
|
for (auto* h : handles) {
|
|
delete h;
|
|
}
|
|
handles.clear();
|
|
CloseDb();
|
|
}
|
|
|
|
// Corrupt second last un-syned wal file to emulate power reset which
|
|
// caused the DB to lose the un-synced WAL.
|
|
{
|
|
std::vector<uint64_t> file_nums;
|
|
GetSortedWalFiles(file_nums);
|
|
size_t size = file_nums.size();
|
|
uint64_t log_num = file_nums[size - 1];
|
|
CorruptFileWithTruncation(FileType::kWalFile, log_num,
|
|
/*bytes_to_truncate=*/8);
|
|
}
|
|
|
|
// Fault is injected to fail the recovery.
|
|
{
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::GetLogSizeAndMaybeTruncate:0", [&](void* arg) {
|
|
auto* tmp_s = reinterpret_cast<Status*>(arg);
|
|
assert(tmp_s);
|
|
*tmp_s = Status::IOError("Injected");
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
handles.clear();
|
|
options.avoid_flush_during_recovery = true;
|
|
s = DB::Open(options, dbname_, cf_descs, &handles, &db_);
|
|
ASSERT_TRUE(s.IsIOError());
|
|
ASSERT_EQ("IO error: Injected", s.ToString());
|
|
for (auto* h : handles) {
|
|
delete h;
|
|
}
|
|
CloseDb();
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
}
|
|
|
|
// Reopen db again
|
|
{
|
|
options.avoid_flush_during_recovery = avoid_flush_during_recovery_;
|
|
ASSERT_OK(DB::Open(options, dbname_, cf_descs, &handles, &db_));
|
|
|
|
// Verify that data is not lost.
|
|
{
|
|
std::string v;
|
|
ASSERT_OK(db_->Get(ReadOptions(), handles[1], "old_key", &v));
|
|
ASSERT_EQ("dontcare", v);
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
v.clear();
|
|
ASSERT_OK(db_->Get(ReadOptions(), "key" + std::to_string(i), &v));
|
|
ASSERT_EQ("value" + std::to_string(i), v);
|
|
}
|
|
|
|
// Since it's corrupting last wal after Flush, below key is not found.
|
|
v.clear();
|
|
ASSERT_EQ(db_->Get(ReadOptions(), handles[1], "dontcare", &v),
|
|
Status::NotFound());
|
|
}
|
|
|
|
for (auto* h : handles) {
|
|
delete h;
|
|
}
|
|
}
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
RegisterCustomObjects(argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|
|
|
|
#else
|
|
#include <stdio.h>
|
|
|
|
int main(int /*argc*/, char** /*argv*/) {
|
|
fprintf(stderr, "SKIPPED as RepairDB() is not supported in ROCKSDB_LITE\n");
|
|
return 0;
|
|
}
|
|
|
|
#endif // !ROCKSDB_LITE
|