// Copyright (c) 2015, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // // Copyright 2014 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. // This test uses a custom Env to keep track of the state of a filesystem as of // the last "sync". It then checks for data loss errors by purposely dropping // file data (or entire files) not protected by a "sync". #include #include #include "db/db_impl.h" #include "db/filename.h" #include "db/log_format.h" #include "db/version_set.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/table.h" #include "rocksdb/write_batch.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/testharness.h" #include "util/testutil.h" namespace rocksdb { static const int kValueSize = 1000; static const int kMaxNumValues = 2000; static const size_t kNumIterations = 3; class TestWritableFile; class FaultInjectionTestEnv; namespace { // Assume a filename, and not a directory name like "/foo/bar/" static std::string GetDirName(const std::string filename) { size_t found = filename.find_last_of("/\\"); if (found == std::string::npos) { return ""; } else { return filename.substr(0, found); } } // Trim the tailing "/" in the end of `str` static std::string TrimDirname(const std::string& str) { size_t found = str.find_last_not_of("/"); if (found == std::string::npos) { return str; } return str.substr(0, found + 1); } // Return pair of a full path. static std::pair GetDirAndName( const std::string& name) { std::string dirname = GetDirName(name); std::string fname = name.substr(dirname.size() + 1); return std::make_pair(dirname, fname); } // A basic file truncation function suitable for this test. Status Truncate(const std::string& filename, uint64_t length) { rocksdb::Env* env = rocksdb::Env::Default(); unique_ptr orig_file; const EnvOptions options; Status s = env->NewSequentialFile(filename, &orig_file, options); if (!s.ok()) return s; char* scratch = new char[length]; rocksdb::Slice result; s = orig_file->Read(length, &result, scratch); if (s.ok()) { std::string tmp_name = GetDirName(filename) + "/truncate.tmp"; unique_ptr tmp_file; s = env->NewWritableFile(tmp_name, &tmp_file, options); if (s.ok()) { s = tmp_file->Append(result); if (s.ok()) { s = env->RenameFile(tmp_name, filename); } else { env->DeleteFile(tmp_name); } } } delete[] scratch; return s; } struct FileState { std::string filename_; ssize_t pos_; ssize_t pos_at_last_sync_; ssize_t pos_at_last_flush_; explicit FileState(const std::string& filename) : filename_(filename), pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) { } FileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {} bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; } Status DropUnsyncedData() const; }; } // anonymous namespace // A wrapper around WritableFile which informs another Env whenever this file // is written to or sync'ed. class TestWritableFile : public WritableFile { public: explicit TestWritableFile(const std::string& fname, unique_ptr&& f, FaultInjectionTestEnv* env); virtual ~TestWritableFile(); virtual Status Append(const Slice& data); virtual Status Close(); virtual Status Flush(); virtual Status Sync(); private: FileState state_; unique_ptr target_; bool writable_file_opened_; FaultInjectionTestEnv* env_; }; class TestDirectory : public Directory { public: explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname, Directory* dir) : env_(env), dirname_(dirname), dir_(dir) {} ~TestDirectory() {} virtual Status Fsync() override; private: FaultInjectionTestEnv* env_; std::string dirname_; unique_ptr dir_; }; class FaultInjectionTestEnv : public EnvWrapper { public: explicit FaultInjectionTestEnv(Env* base) : EnvWrapper(base), filesystem_active_(true) {} virtual ~FaultInjectionTestEnv() { } Status NewDirectory(const std::string& name, unique_ptr* result) override { unique_ptr r; Status s = target()->NewDirectory(name, &r); ASSERT_OK(s); if (!s.ok()) { return s; } result->reset(new TestDirectory(this, TrimDirname(name), r.release())); return Status::OK(); } Status NewWritableFile(const std::string& fname, unique_ptr* result, const EnvOptions& soptions) { Status s = target()->NewWritableFile(fname, result, soptions); if (s.ok()) { result->reset(new TestWritableFile(fname, std::move(*result), this)); // WritableFile doesn't append to files, so if the same file is opened // again then it will be truncated - so forget our saved state. UntrackFile(fname); MutexLock l(&mutex_); auto dir_and_name = GetDirAndName(fname); auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; list.insert(dir_and_name.second); } return s; } virtual Status DeleteFile(const std::string& f) { Status s = EnvWrapper::DeleteFile(f); ASSERT_OK(s); if (s.ok()) { UntrackFile(f); } return s; } virtual Status RenameFile(const std::string& s, const std::string& t) { Status ret = EnvWrapper::RenameFile(s, t); if (ret.ok()) { MutexLock l(&mutex_); if (db_file_state_.find(s) != db_file_state_.end()) { db_file_state_[t] = db_file_state_[s]; db_file_state_.erase(s); } auto sdn = GetDirAndName(s); auto tdn = GetDirAndName(t); if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) { auto& tlist = dir_to_new_files_since_last_sync_[tdn.first]; assert(tlist.find(tdn.second) == tlist.end()); tlist.insert(tdn.second); } } return ret; } void WritableFileClosed(const FileState& state) { MutexLock l(&mutex_); db_file_state_[state.filename_] = state; } Status DropUnsyncedFileData() { Status s; MutexLock l(&mutex_); for (std::map::const_iterator it = db_file_state_.begin(); s.ok() && it != db_file_state_.end(); ++it) { const FileState& state = it->second; if (!state.IsFullySynced()) { s = state.DropUnsyncedData(); } } return s; } Status DeleteFilesCreatedAfterLastDirSync() { // Because DeleteFile access this container make a copy to avoid deadlock std::map> map_copy; { MutexLock l(&mutex_); map_copy.insert(dir_to_new_files_since_last_sync_.begin(), dir_to_new_files_since_last_sync_.end()); } for (auto& pair : map_copy) { for (std::string name : pair.second) { Status s = DeleteFile(pair.first + "/" + name); } } return Status::OK(); } void ResetState() { MutexLock l(&mutex_); db_file_state_.clear(); dir_to_new_files_since_last_sync_.clear(); SetFilesystemActive(true); } void UntrackFile(const std::string& f) { MutexLock l(&mutex_); auto dir_and_name = GetDirAndName(f); dir_to_new_files_since_last_sync_[dir_and_name.first].erase( dir_and_name.second); db_file_state_.erase(f); } void SyncDir(const std::string& dirname) { dir_to_new_files_since_last_sync_.erase(dirname); } // Setting the filesystem to inactive is the test equivalent to simulating a // system reset. Setting to inactive will freeze our saved filesystem state so // that it will stop being recorded. It can then be reset back to the state at // the time of the reset. bool IsFilesystemActive() const { return filesystem_active_; } void SetFilesystemActive(bool active) { filesystem_active_ = active; } private: port::Mutex mutex_; std::map db_file_state_; std::unordered_map> dir_to_new_files_since_last_sync_; bool filesystem_active_; // Record flushes, syncs, writes }; Status FileState::DropUnsyncedData() const { ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; return Truncate(filename_, sync_pos); } Status TestDirectory::Fsync() { env_->SyncDir(dirname_); return dir_->Fsync(); } TestWritableFile::TestWritableFile(const std::string& fname, unique_ptr&& f, FaultInjectionTestEnv* env) : state_(fname), target_(std::move(f)), writable_file_opened_(true), env_(env) { assert(target_ != nullptr); state_.pos_ = 0; } TestWritableFile::~TestWritableFile() { if (writable_file_opened_) { Close(); } } Status TestWritableFile::Append(const Slice& data) { Status s = target_->Append(data); if (s.ok() && env_->IsFilesystemActive()) { state_.pos_ += data.size(); } return s; } Status TestWritableFile::Close() { writable_file_opened_ = false; Status s = target_->Close(); if (s.ok()) { env_->WritableFileClosed(state_); } return s; } Status TestWritableFile::Flush() { Status s = target_->Flush(); if (s.ok() && env_->IsFilesystemActive()) { state_.pos_at_last_flush_ = state_.pos_; } return s; } Status TestWritableFile::Sync() { if (!env_->IsFilesystemActive()) { return Status::OK(); } // No need to actual sync. state_.pos_at_last_sync_ = state_.pos_; return Status::OK(); } class FaultInjectionTest { protected: enum OptionConfig { kDefault, kDifferentDataDir, kWalDir, kSyncWal, kWalDirSyncWal, kEnd, }; int option_config_; // When need to make sure data is persistent, sync WAL bool sync_use_wal_; // When need to make sure data is persistent, call DB::CompactRange() bool sync_use_compact_; protected: public: enum ExpectedVerifResult { kValExpectFound, kValExpectNoError }; enum ResetMethod { kResetDropUnsyncedData, kResetDeleteUnsyncedFiles, kResetDropAndDeleteUnsynced }; FaultInjectionTestEnv* env_; std::string dbname_; shared_ptr tiny_cache_; Options options_; DB* db_; FaultInjectionTest() : option_config_(kDefault), sync_use_wal_(false), sync_use_compact_(true), env_(NULL), db_(NULL) { NewDB(); } ~FaultInjectionTest() { ASSERT_OK(TearDown()); } bool ChangeOptions() { option_config_++; if (option_config_ >= kEnd) { return false; } else { return true; } } // Return the current option configuration. Options CurrentOptions() { sync_use_wal_ = false; sync_use_compact_ = true; Options options; switch (option_config_) { case kWalDir: options.wal_dir = test::TmpDir(env_) + "/fault_test_wal"; break; case kDifferentDataDir: options.db_paths.emplace_back(test::TmpDir(env_) + "/fault_test_data", 1000000U); break; case kSyncWal: sync_use_wal_ = true; sync_use_compact_ = false; break; case kWalDirSyncWal: options.wal_dir = test::TmpDir(env_) + "/fault_test_wal"; sync_use_wal_ = true; sync_use_compact_ = false; break; default: break; } return options; } Status NewDB() { assert(db_ == NULL); assert(tiny_cache_ == nullptr); assert(env_ == NULL); env_ = new FaultInjectionTestEnv(Env::Default()); options_ = CurrentOptions(); options_.env = env_; options_.paranoid_checks = true; BlockBasedTableOptions table_options; tiny_cache_ = NewLRUCache(100); table_options.block_cache = tiny_cache_; options_.table_factory.reset(NewBlockBasedTableFactory(table_options)); dbname_ = test::TmpDir() + "/fault_test"; ASSERT_OK(DestroyDB(dbname_, options_)); options_.create_if_missing = true; Status s = OpenDB(); options_.create_if_missing = false; return s; } Status SetUp() { Status s = TearDown(); if (s.ok()) { s = NewDB(); } return s; } Status TearDown() { CloseDB(); Status s = DestroyDB(dbname_, options_); delete env_; env_ = NULL; tiny_cache_.reset(); return s; } void Build(const WriteOptions& write_options, int start_idx, int num_vals) { std::string key_space, value_space; WriteBatch batch; for (int i = start_idx; i < start_idx + num_vals; i++) { Slice key = Key(i, &key_space); batch.Clear(); batch.Put(key, Value(i, &value_space)); ASSERT_OK(db_->Write(write_options, &batch)); } } Status ReadValue(int i, std::string* val) const { std::string key_space, value_space; Slice key = Key(i, &key_space); Value(i, &value_space); ReadOptions options; return db_->Get(options, key, val); } Status Verify(int start_idx, int num_vals, ExpectedVerifResult expected) const { std::string val; std::string value_space; Status s; for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) { Value(i, &value_space); s = ReadValue(i, &val); if (s.ok()) { ASSERT_EQ(value_space, val); } if (expected == kValExpectFound) { if (!s.ok()) { fprintf(stderr, "Error when read %dth record (expect found): %s\n", i, s.ToString().c_str()); return s; } } else if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "Error when read %dth record: %s\n", i, s.ToString().c_str()); return s; } } return Status::OK(); } // Return the ith key Slice Key(int i, std::string* storage) const { char buf[100]; snprintf(buf, sizeof(buf), "%016d", i); storage->assign(buf, strlen(buf)); return Slice(*storage); } // Return the value to associate with the specified key Slice Value(int k, std::string* storage) const { Random r(k); return test::RandomString(&r, kValueSize, storage); } Status OpenDB() { delete db_; db_ = NULL; env_->ResetState(); return DB::Open(options_, dbname_, &db_); } void CloseDB() { delete db_; db_ = NULL; } void DeleteAllData() { Iterator* iter = db_->NewIterator(ReadOptions()); WriteOptions options; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(db_->Delete(WriteOptions(), iter->key())); } delete iter; FlushOptions flush_options; flush_options.wait = true; db_->Flush(flush_options); } void ResetDBState(ResetMethod reset_method) { switch (reset_method) { case kResetDropUnsyncedData: ASSERT_OK(env_->DropUnsyncedFileData()); break; case kResetDeleteUnsyncedFiles: ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync()); break; case kResetDropAndDeleteUnsynced: ASSERT_OK(env_->DropUnsyncedFileData()); ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync()); break; default: assert(false); } } void PartialCompactTestPreFault(int num_pre_sync, int num_post_sync) { DeleteAllData(); WriteOptions write_options; write_options.sync = sync_use_wal_; Build(write_options, 0, num_pre_sync); if (sync_use_compact_) { db_->CompactRange(nullptr, nullptr); } write_options.sync = false; Build(write_options, num_pre_sync, num_post_sync); } void PartialCompactTestReopenWithFault(ResetMethod reset_method, int num_pre_sync, int num_post_sync) { env_->SetFilesystemActive(false); CloseDB(); ResetDBState(reset_method); ASSERT_OK(OpenDB()); ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::kValExpectFound)); ASSERT_OK(Verify(num_pre_sync, num_post_sync, FaultInjectionTest::kValExpectNoError)); } void NoWriteTestPreFault() { } void NoWriteTestReopenWithFault(ResetMethod reset_method) { CloseDB(); ResetDBState(reset_method); ASSERT_OK(OpenDB()); } }; TEST(FaultInjectionTest, FaultTest) { do { Random rnd(301); ASSERT_OK(SetUp()); for (size_t idx = 0; idx < kNumIterations; idx++) { int num_pre_sync = rnd.Uniform(kMaxNumValues); int num_post_sync = rnd.Uniform(kMaxNumValues); PartialCompactTestPreFault(num_pre_sync, num_post_sync); PartialCompactTestReopenWithFault(kResetDropUnsyncedData, num_pre_sync, num_post_sync); NoWriteTestPreFault(); NoWriteTestReopenWithFault(kResetDropUnsyncedData); // TODO(t6070540) Need to sync WAL Dir and other DB paths too. // Setting a separate data path won't pass the test as we don't sync // it after creating new files, if (option_config_ != kDifferentDataDir) { PartialCompactTestPreFault(num_pre_sync, num_post_sync); // Since we don't sync WAL Dir, this test dosn't pass. if (option_config_ != kWalDirSyncWal) { PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced, num_pre_sync, num_post_sync); } NoWriteTestPreFault(); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); PartialCompactTestPreFault(num_pre_sync, num_post_sync); // No new files created so we expect all values since no files will be // dropped. // WAL Dir is not synced for now. if (option_config_ != kWalDir && option_config_ != kWalDirSyncWal) { PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles, num_pre_sync + num_post_sync, 0); } NoWriteTestPreFault(); NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles); } } } while (ChangeOptions()); } } // namespace rocksdb int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }