// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/db_test_util.h" #include "db/db_with_timestamp_test_util.h" #include "options/options_helper.h" #include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/file_system.h" #include "test_util/sync_point.h" #include "util/udt_util.h" #include "utilities/fault_injection_env.h" #include "utilities/fault_injection_fs.h" namespace ROCKSDB_NAMESPACE { class DBWALTestBase : public DBTestBase { protected: explicit DBWALTestBase(const std::string& dir_name) : DBTestBase(dir_name, /*env_do_fsync=*/true) {} #if defined(ROCKSDB_PLATFORM_POSIX) public: #if defined(ROCKSDB_FALLOCATE_PRESENT) bool IsFallocateSupported() { // Test fallocate support of running file system. // Skip this test if fallocate is not supported. std::string fname_test_fallocate = dbname_ + "/preallocate_testfile"; int fd = -1; do { fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); } while (fd < 0 && errno == EINTR); assert(fd > 0); int alloc_status = fallocate(fd, 0, 0, 1); int err_number = errno; close(fd); assert(env_->DeleteFile(fname_test_fallocate) == Status::OK()); if (err_number == ENOSYS || err_number == EOPNOTSUPP) { fprintf(stderr, "Skipped preallocated space check: %s\n", errnoStr(err_number).c_str()); return false; } assert(alloc_status == 0); return true; } #endif // ROCKSDB_FALLOCATE_PRESENT uint64_t GetAllocatedFileSize(std::string file_name) { struct stat sbuf; int err = stat(file_name.c_str(), &sbuf); assert(err == 0); return sbuf.st_blocks * 512; } #endif // ROCKSDB_PLATFORM_POSIX }; class DBWALTest : public DBWALTestBase { public: DBWALTest() : DBWALTestBase("/db_wal_test") {} }; // A SpecialEnv enriched to give more insight about deleted files class EnrichedSpecialEnv : public SpecialEnv { public: explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {} Status NewSequentialFile(const std::string& f, std::unique_ptr* r, const EnvOptions& soptions) override { InstrumentedMutexLock l(&env_mutex_); if (f == skipped_wal) { deleted_wal_reopened = true; if (IsWAL(f) && largest_deleted_wal.size() != 0 && f.compare(largest_deleted_wal) <= 0) { gap_in_wals = true; } } return SpecialEnv::NewSequentialFile(f, r, soptions); } Status DeleteFile(const std::string& fname) override { if (IsWAL(fname)) { deleted_wal_cnt++; InstrumentedMutexLock l(&env_mutex_); // If this is the first WAL, remember its name and skip deleting it. We // remember its name partly because the application might attempt to // delete the file again. if (skipped_wal.size() != 0 && skipped_wal != fname) { if (largest_deleted_wal.size() == 0 || largest_deleted_wal.compare(fname) < 0) { largest_deleted_wal = fname; } } else { skipped_wal = fname; return Status::OK(); } } return SpecialEnv::DeleteFile(fname); } bool IsWAL(const std::string& fname) { // printf("iswal %s\n", fname.c_str()); return fname.compare(fname.size() - 3, 3, "log") == 0; } InstrumentedMutex env_mutex_; // the wal whose actual delete was skipped by the env std::string skipped_wal; // the largest WAL that was requested to be deleted std::string largest_deleted_wal; // number of WALs that were successfully deleted std::atomic deleted_wal_cnt = {0}; // the WAL whose delete from fs was skipped is reopened during recovery std::atomic deleted_wal_reopened = {false}; // whether a gap in the WALs was detected during recovery std::atomic gap_in_wals = {false}; }; class DBWALTestWithEnrichedEnv : public DBTestBase { public: DBWALTestWithEnrichedEnv() : DBTestBase("db_wal_test", /*env_do_fsync=*/true) { enriched_env_ = new EnrichedSpecialEnv(env_->target()); auto options = CurrentOptions(); options.env = enriched_env_; options.allow_2pc = true; Reopen(options); delete env_; // to be deleted by the parent class env_ = enriched_env_; } protected: EnrichedSpecialEnv* enriched_env_; }; // Test that the recovery would successfully avoid the gaps between the logs. // One known scenario that could cause this is that the application issue the // WAL deletion out of order. For the sake of simplicity in the test, here we // create the gap by manipulating the env to skip deletion of the first WAL but // not the ones after it. TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) { auto options = last_options_; // To cause frequent WAL deletion options.write_buffer_size = 128; Reopen(options); SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::PurgeObsoleteFiles:End", "DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush"}}); SyncPoint::GetInstance()->EnableProcessing(); WriteOptions writeOpt = WriteOptions(); for (int i = 0; i < 128 * 5; i++) { ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1")); } FlushOptions fo; fo.wait = true; ASSERT_OK(db_->Flush(fo)); TEST_SYNC_POINT("DBWALTestWithEnrichedEnv.SkipDeletedWALs:AfterFlush"); // some wals are deleted ASSERT_NE(0, enriched_env_->deleted_wal_cnt); // but not the first one ASSERT_NE(0, enriched_env_->skipped_wal.size()); // Test that the WAL that was not deleted will be skipped during recovery options = last_options_; Reopen(options); ASSERT_FALSE(enriched_env_->deleted_wal_reopened); ASSERT_FALSE(enriched_env_->gap_in_wals); SyncPoint::GetInstance()->DisableProcessing(); } TEST_F(DBWALTest, WAL) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); WriteOptions writeOpt = WriteOptions(); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v1", Get(1, "bar")); writeOpt.disableWAL = false; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2")); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2")); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); // Both value's should be present. ASSERT_EQ("v2", Get(1, "bar")); ASSERT_EQ("v2", Get(1, "foo")); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3")); writeOpt.disableWAL = false; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3")); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); // again both values should be present. ASSERT_EQ("v3", Get(1, "foo")); ASSERT_EQ("v3", Get(1, "bar")); } while (ChangeWalOptions()); } TEST_F(DBWALTest, RollLog) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "foo", "v1")); ASSERT_OK(Put(1, "baz", "v5")); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); for (int i = 0; i < 10; i++) { ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); } ASSERT_OK(Put(1, "foo", "v4")); for (int i = 0; i < 10; i++) { ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); } } while (ChangeWalOptions()); } TEST_F(DBWALTest, SyncWALNotBlockWrite) { Options options = CurrentOptions(); options.max_write_buffer_number = 4; DestroyAndReopen(options); ASSERT_OK(Put("foo1", "bar1")); ASSERT_OK(Put("foo5", "bar5")); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ {"WritableFileWriter::SyncWithoutFlush:1", "DBWALTest::SyncWALNotBlockWrite:1"}, {"DBWALTest::SyncWALNotBlockWrite:2", "WritableFileWriter::SyncWithoutFlush:2"}, }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::port::Thread thread([&]() { ASSERT_OK(db_->SyncWAL()); }); TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:1"); ASSERT_OK(Put("foo2", "bar2")); ASSERT_OK(Put("foo3", "bar3")); FlushOptions fo; fo.wait = false; ASSERT_OK(db_->Flush(fo)); ASSERT_OK(Put("foo4", "bar4")); TEST_SYNC_POINT("DBWALTest::SyncWALNotBlockWrite:2"); thread.join(); ASSERT_EQ(Get("foo1"), "bar1"); ASSERT_EQ(Get("foo2"), "bar2"); ASSERT_EQ(Get("foo3"), "bar3"); ASSERT_EQ(Get("foo4"), "bar4"); ASSERT_EQ(Get("foo5"), "bar5"); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } TEST_F(DBWALTest, SyncWALNotWaitWrite) { ASSERT_OK(Put("foo1", "bar1")); ASSERT_OK(Put("foo3", "bar3")); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ {"SpecialEnv::WalFile::Append:1", "DBWALTest::SyncWALNotWaitWrite:1"}, {"DBWALTest::SyncWALNotWaitWrite:2", "SpecialEnv::WalFile::Append:2"}, }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::port::Thread thread( [&]() { ASSERT_OK(Put("foo2", "bar2")); }); // Moving this to SyncWAL before the actual fsync // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); ASSERT_OK(db_->SyncWAL()); // Moving this to SyncWAL after actual fsync // TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); thread.join(); ASSERT_EQ(Get("foo1"), "bar1"); ASSERT_EQ(Get("foo2"), "bar2"); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } TEST_F(DBWALTest, Recover) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "foo", "v1")); ASSERT_OK(Put(1, "baz", "v5")); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v5", Get(1, "baz")); ASSERT_OK(Put(1, "bar", "v2")); ASSERT_OK(Put(1, "foo", "v3")); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ASSERT_EQ("v3", Get(1, "foo")); ASSERT_OK(Put(1, "foo", "v4")); ASSERT_EQ("v4", Get(1, "foo")); ASSERT_EQ("v2", Get(1, "bar")); ASSERT_EQ("v5", Get(1, "baz")); } while (ChangeWalOptions()); } class DBWALTestWithTimestamp : public DBBasicTestWithTimestampBase, public testing::WithParamInterface { public: DBWALTestWithTimestamp() : DBBasicTestWithTimestampBase("db_wal_test_with_timestamp") {} Status CreateAndReopenWithTs(const std::vector& cfs, const Options& ts_options, bool persist_udt, bool avoid_flush_during_recovery = false) { Options default_options = CurrentOptions(); default_options.allow_concurrent_memtable_write = persist_udt ? true : false; DestroyAndReopen(default_options); CreateColumnFamilies(cfs, ts_options); return ReopenColumnFamiliesWithTs(cfs, ts_options, persist_udt, avoid_flush_during_recovery); } Status ReopenColumnFamiliesWithTs(const std::vector& cfs, Options ts_options, bool persist_udt, bool avoid_flush_during_recovery = false) { Options default_options = CurrentOptions(); default_options.create_if_missing = false; default_options.allow_concurrent_memtable_write = persist_udt ? true : false; default_options.avoid_flush_during_recovery = avoid_flush_during_recovery; ts_options.create_if_missing = false; std::vector cf_options(cfs.size(), ts_options); std::vector cfs_plus_default = cfs; cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); cf_options.insert(cf_options.begin(), default_options); Close(); return TryReopenWithColumnFamilies(cfs_plus_default, cf_options); } Status Put(uint32_t cf, const Slice& key, const Slice& ts, const Slice& value) { WriteOptions write_opts; return db_->Put(write_opts, handles_[cf], key, ts, value); } void CheckGet(const ReadOptions& read_opts, uint32_t cf, const Slice& key, const std::string& expected_value, const std::string& expected_ts) { std::string actual_value; std::string actual_ts; ASSERT_OK( db_->Get(read_opts, handles_[cf], key, &actual_value, &actual_ts)); ASSERT_EQ(expected_value, actual_value); ASSERT_EQ(expected_ts, actual_ts); } }; TEST_P(DBWALTestWithTimestamp, RecoverAndNoFlush) { // Set up the option that enables user defined timestmp size. std::string ts1; PutFixed64(&ts1, 1); Options ts_options; ts_options.create_if_missing = true; ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); // Test that user-defined timestamps are recovered from WAL regardless of // the value of this flag because UDTs are saved in WAL nonetheless. // We however need to explicitly disable flush during recovery by setting // `avoid_flush_during_recovery=true` so that we can avoid timestamps getting // stripped when the `persist_user_defined_timestamps` flag is false, so that // all written timestamps are available for testing user-defined time travel // read. bool persist_udt = test::ShouldPersistUDT(GetParam()); ts_options.persist_user_defined_timestamps = persist_udt; bool avoid_flush_during_recovery = true; std::string full_history_ts_low; ReadOptions read_opts; do { Slice ts_slice = ts1; read_opts.timestamp = &ts_slice; ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt, avoid_flush_during_recovery)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); ASSERT_OK(Put(1, "foo", ts1, "v1")); ASSERT_OK(Put(1, "baz", ts1, "v5")); ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt, avoid_flush_during_recovery)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); // Do a timestamped read with ts1 after second reopen. CheckGet(read_opts, 1, "foo", "v1", ts1); CheckGet(read_opts, 1, "baz", "v5", ts1); // Write more value versions for key "foo" and "bar" before and after second // reopen. std::string ts2; PutFixed64(&ts2, 2); ASSERT_OK(Put(1, "bar", ts2, "v2")); ASSERT_OK(Put(1, "foo", ts2, "v3")); ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt, avoid_flush_during_recovery)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); std::string ts3; PutFixed64(&ts3, 3); ASSERT_OK(Put(1, "foo", ts3, "v4")); // All the key value pairs available for read: // "foo" -> [(ts1, "v1"), (ts2, "v3"), (ts3, "v4")] // "bar" -> [(ts2, "v2")] // "baz" -> [(ts1, "v5")] // Do a timestamped read with ts1 after third reopen. // read_opts.timestamp is set to ts1 for below reads CheckGet(read_opts, 1, "foo", "v1", ts1); std::string value; ASSERT_TRUE(db_->Get(read_opts, handles_[1], "bar", &value).IsNotFound()); CheckGet(read_opts, 1, "baz", "v5", ts1); // Do a timestamped read with ts2 after third reopen. ts_slice = ts2; // read_opts.timestamp is set to ts2 for below reads. CheckGet(read_opts, 1, "foo", "v3", ts2); CheckGet(read_opts, 1, "bar", "v2", ts2); CheckGet(read_opts, 1, "baz", "v5", ts1); // Do a timestamped read with ts3 after third reopen. ts_slice = ts3; // read_opts.timestamp is set to ts3 for below reads. CheckGet(read_opts, 1, "foo", "v4", ts3); CheckGet(read_opts, 1, "bar", "v2", ts2); CheckGet(read_opts, 1, "baz", "v5", ts1); ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low)); ASSERT_TRUE(full_history_ts_low.empty()); } while (ChangeWalOptions()); } TEST_P(DBWALTestWithTimestamp, RecoverAndFlush) { // Set up the option that enables user defined timestamp size. std::string min_ts; std::string write_ts; PutFixed64(&min_ts, 0); PutFixed64(&write_ts, 1); Options ts_options; ts_options.create_if_missing = true; ts_options.comparator = test::BytewiseComparatorWithU64TsWrapper(); bool persist_udt = test::ShouldPersistUDT(GetParam()); ts_options.persist_user_defined_timestamps = persist_udt; std::string smallest_ukey_without_ts = "baz"; std::string largest_ukey_without_ts = "foo"; ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, ts_options, persist_udt)); // No flush, no sst files, because of no data. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 0U); ASSERT_OK(Put(1, largest_ukey_without_ts, write_ts, "v1")); ASSERT_OK(Put(1, smallest_ukey_without_ts, write_ts, "v5")); ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, ts_options, persist_udt)); // Memtable recovered from WAL flushed because `avoid_flush_during_recovery` // defaults to false, created one L0 file. ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), 1U); std::vector> level_to_files; dbfull()->TEST_GetFilesMetaData(handles_[1], &level_to_files); std::string full_history_ts_low; ASSERT_OK(db_->GetFullHistoryTsLow(handles_[1], &full_history_ts_low)); ASSERT_GT(level_to_files.size(), 1); // L0 only has one SST file. ASSERT_EQ(level_to_files[0].size(), 1); auto meta = level_to_files[0][0]; if (persist_udt) { ASSERT_EQ(smallest_ukey_without_ts + write_ts, meta.smallest.user_key()); ASSERT_EQ(largest_ukey_without_ts + write_ts, meta.largest.user_key()); ASSERT_TRUE(full_history_ts_low.empty()); } else { ASSERT_EQ(smallest_ukey_without_ts + min_ts, meta.smallest.user_key()); ASSERT_EQ(largest_ukey_without_ts + min_ts, meta.largest.user_key()); std::string effective_cutoff; Slice write_ts_slice = write_ts; GetFullHistoryTsLowFromU64CutoffTs(&write_ts_slice, &effective_cutoff); ASSERT_EQ(effective_cutoff, full_history_ts_low); } } // Param 0: test mode for the user-defined timestamp feature INSTANTIATE_TEST_CASE_P( P, DBWALTestWithTimestamp, ::testing::Values( test::UserDefinedTimestampTestMode::kStripUserDefinedTimestamp, test::UserDefinedTimestampTestMode::kNormal)); TEST_F(DBWALTestWithTimestamp, EnableDisableUDT) { Options options; options.create_if_missing = true; options.comparator = BytewiseComparator(); bool avoid_flush_during_recovery = true; ASSERT_OK(CreateAndReopenWithTs({"pikachu"}, options, true /* persist_udt */, avoid_flush_during_recovery)); ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "v1")); ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", "v5")); options.comparator = test::BytewiseComparatorWithU64TsWrapper(); options.persist_user_defined_timestamps = false; // Test handle timestamp size inconsistency in WAL when enabling user-defined // timestamps. ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options, false /* persist_udt */, avoid_flush_during_recovery)); std::string ts; PutFixed64(&ts, 0); Slice ts_slice = ts; ReadOptions read_opts; read_opts.timestamp = &ts_slice; // Pre-existing entries are treated as if they have the min timestamp. CheckGet(read_opts, 1, "foo", "v1", ts); CheckGet(read_opts, 1, "baz", "v5", ts); ts.clear(); PutFixed64(&ts, 1); ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", ts, "v2")); ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "baz", ts, "v6")); CheckGet(read_opts, 1, "foo", "v2", ts); CheckGet(read_opts, 1, "baz", "v6", ts); options.comparator = BytewiseComparator(); // Open the column family again with the UDT feature disabled. Test handle // timestamp size inconsistency in WAL when disabling user-defined timestamps ASSERT_OK(ReopenColumnFamiliesWithTs({"pikachu"}, options, true /* persist_udt */, avoid_flush_during_recovery)); ASSERT_EQ("v2", Get(1, "foo")); ASSERT_EQ("v6", Get(1, "baz")); } TEST_F(DBWALTest, RecoverWithTableHandle) { do { Options options = CurrentOptions(); options.create_if_missing = true; options.disable_auto_compactions = true; options.avoid_flush_during_recovery = false; DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(Put(1, "foo", "v1")); ASSERT_OK(Put(1, "bar", "v2")); ASSERT_OK(Flush(1)); ASSERT_OK(Put(1, "foo", "v3")); ASSERT_OK(Put(1, "bar", "v4")); ASSERT_OK(Flush(1)); ASSERT_OK(Put(1, "big", std::string(100, 'a'))); options = CurrentOptions(); const int kSmallMaxOpenFiles = 13; if (option_config_ == kDBLogDir) { // Use this option to check not preloading files // Set the max open files to be small enough so no preload will // happen. options.max_open_files = kSmallMaxOpenFiles; // RocksDB sanitize max open files to at least 20. Modify it back. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { int* max_open_files = static_cast(arg); *max_open_files = kSmallMaxOpenFiles; }); } else if (option_config_ == kWalDirAndMmapReads) { // Use this option to check always loading all files. options.max_open_files = 100; } else { options.max_open_files = -1; } ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ReopenWithColumnFamilies({"default", "pikachu"}, options); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); std::vector> files; dbfull()->TEST_GetFilesMetaData(handles_[1], &files); size_t total_files = 0; for (const auto& level : files) { total_files += level.size(); } ASSERT_EQ(total_files, 3); for (const auto& level : files) { for (const auto& file : level) { if (options.max_open_files == kSmallMaxOpenFiles) { ASSERT_TRUE(file.table_reader_handle == nullptr); } else { ASSERT_TRUE(file.table_reader_handle != nullptr); } } } } while (ChangeWalOptions()); } TEST_F(DBWALTest, RecoverWithBlob) { // Write a value that's below the prospective size limit for blobs and another // one that's above. Note that blob files are not actually enabled at this // point. constexpr uint64_t min_blob_size = 10; constexpr char short_value[] = "short"; static_assert(sizeof(short_value) - 1 < min_blob_size, "short_value too long"); constexpr char long_value[] = "long_value"; static_assert(sizeof(long_value) - 1 >= min_blob_size, "long_value too short"); ASSERT_OK(Put("key1", short_value)); ASSERT_OK(Put("key2", long_value)); // There should be no files just yet since we haven't flushed. { VersionSet* const versions = dbfull()->GetVersionSet(); ASSERT_NE(versions, nullptr); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ASSERT_NE(cfd, nullptr); Version* const current = cfd->current(); ASSERT_NE(current, nullptr); const VersionStorageInfo* const storage_info = current->storage_info(); ASSERT_NE(storage_info, nullptr); ASSERT_EQ(storage_info->num_non_empty_levels(), 0); ASSERT_TRUE(storage_info->GetBlobFiles().empty()); } // Reopen the database with blob files enabled. A new table file/blob file // pair should be written during recovery. Options options; options.enable_blob_files = true; options.min_blob_size = min_blob_size; options.avoid_flush_during_recovery = false; options.disable_auto_compactions = true; options.env = env_; Reopen(options); ASSERT_EQ(Get("key1"), short_value); ASSERT_EQ(Get("key2"), long_value); VersionSet* const versions = dbfull()->GetVersionSet(); ASSERT_NE(versions, nullptr); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ASSERT_NE(cfd, nullptr); Version* const current = cfd->current(); ASSERT_NE(current, nullptr); const VersionStorageInfo* const storage_info = current->storage_info(); ASSERT_NE(storage_info, nullptr); const auto& l0_files = storage_info->LevelFiles(0); ASSERT_EQ(l0_files.size(), 1); const FileMetaData* const table_file = l0_files[0]; ASSERT_NE(table_file, nullptr); const auto& blob_files = storage_info->GetBlobFiles(); ASSERT_EQ(blob_files.size(), 1); const auto& blob_file = blob_files.front(); ASSERT_NE(blob_file, nullptr); ASSERT_EQ(table_file->smallest.user_key(), "key1"); ASSERT_EQ(table_file->largest.user_key(), "key2"); ASSERT_EQ(table_file->fd.smallest_seqno, 1); ASSERT_EQ(table_file->fd.largest_seqno, 2); ASSERT_EQ(table_file->oldest_blob_file_number, blob_file->GetBlobFileNumber()); ASSERT_EQ(blob_file->GetTotalBlobCount(), 1); const InternalStats* const internal_stats = cfd->internal_stats(); ASSERT_NE(internal_stats, nullptr); const auto& compaction_stats = internal_stats->TEST_GetCompactionStats(); ASSERT_FALSE(compaction_stats.empty()); ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize()); ASSERT_EQ(compaction_stats[0].bytes_written_blob, blob_file->GetTotalBlobBytes()); ASSERT_EQ(compaction_stats[0].num_output_files, 1); ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1); const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue(); ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], compaction_stats[0].bytes_written + compaction_stats[0].bytes_written_blob); } TEST_F(DBWALTest, RecoverWithBlobMultiSST) { // Write several large (4 KB) values without flushing. Note that blob files // are not actually enabled at this point. std::string large_value(1 << 12, 'a'); constexpr int num_keys = 64; for (int i = 0; i < num_keys; ++i) { ASSERT_OK(Put(Key(i), large_value)); } // There should be no files just yet since we haven't flushed. { VersionSet* const versions = dbfull()->GetVersionSet(); ASSERT_NE(versions, nullptr); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ASSERT_NE(cfd, nullptr); Version* const current = cfd->current(); ASSERT_NE(current, nullptr); const VersionStorageInfo* const storage_info = current->storage_info(); ASSERT_NE(storage_info, nullptr); ASSERT_EQ(storage_info->num_non_empty_levels(), 0); ASSERT_TRUE(storage_info->GetBlobFiles().empty()); } // Reopen the database with blob files enabled and write buffer size set to a // smaller value. Multiple table files+blob files should be written and added // to the Version during recovery. Options options; options.write_buffer_size = 1 << 16; // 64 KB options.enable_blob_files = true; options.avoid_flush_during_recovery = false; options.disable_auto_compactions = true; options.env = env_; Reopen(options); for (int i = 0; i < num_keys; ++i) { ASSERT_EQ(Get(Key(i)), large_value); } VersionSet* const versions = dbfull()->GetVersionSet(); ASSERT_NE(versions, nullptr); ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); ASSERT_NE(cfd, nullptr); Version* const current = cfd->current(); ASSERT_NE(current, nullptr); const VersionStorageInfo* const storage_info = current->storage_info(); ASSERT_NE(storage_info, nullptr); const auto& l0_files = storage_info->LevelFiles(0); ASSERT_GT(l0_files.size(), 1); const auto& blob_files = storage_info->GetBlobFiles(); ASSERT_GT(blob_files.size(), 1); ASSERT_EQ(l0_files.size(), blob_files.size()); } TEST_F(DBWALTest, WALWithChecksumHandoff) { #ifndef ROCKSDB_ASSERT_STATUS_CHECKED if (mem_env_ || encrypted_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); return; } std::shared_ptr fault_fs( new FaultInjectionTestFS(FileSystem::Default())); std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); do { Options options = CurrentOptions(); options.checksum_handoff_file_types.Add(FileType::kWalFile); options.env = fault_fs_env.get(); fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); CreateAndReopenWithCF({"pikachu"}, options); WriteOptions writeOpt = WriteOptions(); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1")); ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1")); ReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("v1", Get(1, "bar")); writeOpt.disableWAL = false; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2")); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2")); ReopenWithColumnFamilies({"default", "pikachu"}, options); // Both value's should be present. ASSERT_EQ("v2", Get(1, "bar")); ASSERT_EQ("v2", Get(1, "foo")); writeOpt.disableWAL = true; // This put, data is persisted by Flush ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3")); ReopenWithColumnFamilies({"default", "pikachu"}, options); writeOpt.disableWAL = false; // Data is persisted in the WAL ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "zoo", "v3")); // The hash does not match, write fails fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash); writeOpt.disableWAL = false; ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3")); ReopenWithColumnFamilies({"default", "pikachu"}, options); // Due to the write failure, Get should not find ASSERT_NE("v3", Get(1, "foo")); ASSERT_EQ("v3", Get(1, "zoo")); ASSERT_EQ("v3", Get(1, "bar")); fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); // Each write will be similated as corrupted. fault_fs->IngestDataCorruptionBeforeWrite(); writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v4")); writeOpt.disableWAL = false; ASSERT_NOK(dbfull()->Put(writeOpt, handles_[1], "foo", "v4")); ReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_NE("v4", Get(1, "foo")); ASSERT_NE("v4", Get(1, "bar")); fault_fs->NoDataCorruptionBeforeWrite(); fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum); // The file system does not provide checksum method and verification. writeOpt.disableWAL = true; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v5")); writeOpt.disableWAL = false; ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v5")); ReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_EQ("v5", Get(1, "foo")); ASSERT_EQ("v5", Get(1, "bar")); Destroy(options); } while (ChangeWalOptions()); #endif // ROCKSDB_ASSERT_STATUS_CHECKED } TEST_F(DBWALTest, LockWal) { do { Options options = CurrentOptions(); options.create_if_missing = true; DestroyAndReopen(options); ASSERT_OK(Put("foo", "v")); ASSERT_OK(Put("bar", "v")); ASSERT_OK(db_->LockWAL()); // Verify writes are stopped WriteOptions wopts; wopts.no_slowdown = true; Status s = db_->Put(wopts, "foo", "dontcare"); ASSERT_TRUE(s.IsIncomplete()); { VectorLogPtr wals; ASSERT_OK(db_->GetSortedWalFiles(wals)); ASSERT_FALSE(wals.empty()); } port::Thread worker([&]() { Status tmp_s = db_->Flush(FlushOptions()); ASSERT_OK(tmp_s); }); FlushOptions flush_opts; flush_opts.wait = false; s = db_->Flush(flush_opts); ASSERT_TRUE(s.IsTryAgain()); ASSERT_OK(db_->UnlockWAL()); ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare")); worker.join(); } while (ChangeWalOptions()); } class DBRecoveryTestBlobError : public DBWALTest, public testing::WithParamInterface { public: DBRecoveryTestBlobError() : sync_point_(GetParam()) {} std::string sync_point_; }; INSTANTIATE_TEST_CASE_P(DBRecoveryTestBlobError, DBRecoveryTestBlobError, ::testing::ValuesIn(std::vector{ "BlobFileBuilder::WriteBlobToFile:AddRecord", "BlobFileBuilder::WriteBlobToFile:AppendFooter"})); TEST_P(DBRecoveryTestBlobError, RecoverWithBlobError) { // Write a value. Note that blob files are not actually enabled at this point. ASSERT_OK(Put("key", "blob")); // Reopen with blob files enabled but make blob file writing fail during // recovery. SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { Status* const s = static_cast(arg); assert(s); (*s) = Status::IOError(sync_point_); }); SyncPoint::GetInstance()->EnableProcessing(); Options options; options.enable_blob_files = true; options.avoid_flush_during_recovery = false; options.disable_auto_compactions = true; options.env = env_; ASSERT_NOK(TryReopen(options)); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); // Make sure the files generated by the failed recovery have been deleted. std::vector files; ASSERT_OK(env_->GetChildren(dbname_, &files)); for (const auto& file : files) { uint64_t number = 0; FileType type = kTableFile; if (!ParseFileName(file, &number, &type)) { continue; } ASSERT_NE(type, kTableFile); ASSERT_NE(type, kBlobFile); } } TEST_F(DBWALTest, IgnoreRecoveredLog) { std::string backup_logs = dbname_ + "/backup_logs"; do { // delete old files in backup_logs directory ASSERT_OK(env_->CreateDirIfMissing(backup_logs)); std::vector old_files; ASSERT_OK(env_->GetChildren(backup_logs, &old_files)); for (auto& file : old_files) { ASSERT_OK(env_->DeleteFile(backup_logs + "/" + file)); } Options options = CurrentOptions(); options.create_if_missing = true; options.merge_operator = MergeOperators::CreateUInt64AddOperator(); options.wal_dir = dbname_ + "/logs"; DestroyAndReopen(options); // fill up the DB std::string one, two; PutFixed64(&one, 1); PutFixed64(&two, 2); ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one))); ASSERT_OK(db_->Merge(WriteOptions(), Slice("foo"), Slice(one))); ASSERT_OK(db_->Merge(WriteOptions(), Slice("bar"), Slice(one))); // copy the logs to backup std::vector logs; ASSERT_OK(env_->GetChildren(options.wal_dir, &logs)); for (auto& log : logs) { CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log); } // recover the DB Reopen(options); ASSERT_EQ(two, Get("foo")); ASSERT_EQ(one, Get("bar")); Close(); // copy the logs from backup back to wal dir for (auto& log : logs) { CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); } // this should ignore the log files, recovery should not happen again // if the recovery happens, the same merge operator would be called twice, // leading to incorrect results Reopen(options); ASSERT_EQ(two, Get("foo")); ASSERT_EQ(one, Get("bar")); Close(); Destroy(options); Reopen(options); Close(); // copy the logs from backup back to wal dir ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir)); for (auto& log : logs) { CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); } // assert that we successfully recovered only from logs, even though we // destroyed the DB Reopen(options); ASSERT_EQ(two, Get("foo")); ASSERT_EQ(one, Get("bar")); // Recovery will fail if DB directory doesn't exist. Destroy(options); // copy the logs from backup back to wal dir ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir)); for (auto& log : logs) { CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log); // we won't be needing this file no more ASSERT_OK(env_->DeleteFile(backup_logs + "/" + log)); } Status s = TryReopen(options); ASSERT_NOK(s); Destroy(options); } while (ChangeWalOptions()); } TEST_F(DBWALTest, RecoveryWithEmptyLog) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "foo", "v1")); ASSERT_OK(Put(1, "foo", "v2")); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "foo", "v3")); ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); ASSERT_EQ("v3", Get(1, "foo")); } while (ChangeWalOptions()); } #if !(defined NDEBUG) || !defined(OS_WIN) TEST_F(DBWALTest, PreallocateBlock) { Options options = CurrentOptions(); options.write_buffer_size = 10 * 1000 * 1000; options.max_total_wal_size = 0; size_t expected_preallocation_size = static_cast( options.write_buffer_size + options.write_buffer_size / 10); DestroyAndReopen(options); std::atomic called(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBTestWalFile.GetPreallocationStatus", [&](void* arg) { ASSERT_TRUE(arg != nullptr); size_t preallocation_size = *(static_cast(arg)); ASSERT_EQ(expected_preallocation_size, preallocation_size); called.fetch_add(1); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(Put("", "")); ASSERT_OK(Flush()); ASSERT_OK(Put("", "")); Close(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ASSERT_EQ(2, called.load()); options.max_total_wal_size = 1000 * 1000; expected_preallocation_size = static_cast(options.max_total_wal_size); Reopen(options); called.store(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBTestWalFile.GetPreallocationStatus", [&](void* arg) { ASSERT_TRUE(arg != nullptr); size_t preallocation_size = *(static_cast(arg)); ASSERT_EQ(expected_preallocation_size, preallocation_size); called.fetch_add(1); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(Put("", "")); ASSERT_OK(Flush()); ASSERT_OK(Put("", "")); Close(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ASSERT_EQ(2, called.load()); options.db_write_buffer_size = 800 * 1000; expected_preallocation_size = static_cast(options.db_write_buffer_size); Reopen(options); called.store(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBTestWalFile.GetPreallocationStatus", [&](void* arg) { ASSERT_TRUE(arg != nullptr); size_t preallocation_size = *(static_cast(arg)); ASSERT_EQ(expected_preallocation_size, preallocation_size); called.fetch_add(1); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(Put("", "")); ASSERT_OK(Flush()); ASSERT_OK(Put("", "")); Close(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ASSERT_EQ(2, called.load()); expected_preallocation_size = 700 * 1000; std::shared_ptr write_buffer_manager = std::make_shared(static_cast(700 * 1000)); options.write_buffer_manager = write_buffer_manager; Reopen(options); called.store(0); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "DBTestWalFile.GetPreallocationStatus", [&](void* arg) { ASSERT_TRUE(arg != nullptr); size_t preallocation_size = *(static_cast(arg)); ASSERT_EQ(expected_preallocation_size, preallocation_size); called.fetch_add(1); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(Put("", "")); ASSERT_OK(Flush()); ASSERT_OK(Put("", "")); Close(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ASSERT_EQ(2, called.load()); } #endif // !(defined NDEBUG) || !defined(OS_WIN) TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) { // TODO(ajkr): Disabled until WAL recycling is fixed for // `kPointInTimeRecovery`. // For github issue #1303 for (int i = 0; i < 2; ++i) { Options options = CurrentOptions(); options.create_if_missing = true; options.recycle_log_file_num = 2; if (i != 0) { options.wal_dir = alternative_wal_dir_; } DestroyAndReopen(options); ASSERT_OK(Put("foo", "v1")); VectorLogPtr log_files; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); ASSERT_GT(log_files.size(), 0); ASSERT_OK(Flush()); // Now the original WAL is in log_files[0] and should be marked for // recycling. // Verify full purge cannot remove this file. JobContext job_context(0); dbfull()->TEST_LockMutex(); dbfull()->FindObsoleteFiles(&job_context, true /* force */); dbfull()->TEST_UnlockMutex(); dbfull()->PurgeObsoleteFiles(job_context); if (i == 0) { ASSERT_OK( env_->FileExists(LogFileName(dbname_, log_files[0]->LogNumber()))); } else { ASSERT_OK(env_->FileExists( LogFileName(alternative_wal_dir_, log_files[0]->LogNumber()))); } } } TEST_F(DBWALTest, DISABLED_FullPurgePreservesLogPendingReuse) { // TODO(ajkr): Disabled until WAL recycling is fixed for // `kPointInTimeRecovery`. // Ensures full purge cannot delete a WAL while it's in the process of being // recycled. In particular, we force the full purge after a file has been // chosen for reuse, but before it has been renamed. for (int i = 0; i < 2; ++i) { Options options = CurrentOptions(); options.recycle_log_file_num = 1; if (i != 0) { options.wal_dir = alternative_wal_dir_; } DestroyAndReopen(options); // The first flush creates a second log so writes can continue before the // flush finishes. ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Flush()); // The second flush can recycle the first log. Sync points enforce the // full purge happens after choosing the log to recycle and before it is // renamed. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ {"DBImpl::CreateWAL:BeforeReuseWritableFile1", "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"}, {"DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge", "DBImpl::CreateWAL:BeforeReuseWritableFile2"}, }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::port::Thread thread([&]() { TEST_SYNC_POINT( "DBWALTest::FullPurgePreservesLogPendingReuse:PreFullPurge"); ASSERT_OK(db_->EnableFileDeletions(/*force=*/true)); TEST_SYNC_POINT( "DBWALTest::FullPurgePreservesLogPendingReuse:PostFullPurge"); }); ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Flush()); thread.join(); } } TEST_F(DBWALTest, GetSortedWalFiles) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); VectorLogPtr log_files; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); ASSERT_EQ(0, log_files.size()); ASSERT_OK(Put(1, "foo", "v1")); ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); ASSERT_EQ(1, log_files.size()); } while (ChangeWalOptions()); } TEST_F(DBWALTest, GetCurrentWalFile) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); std::unique_ptr* bad_log_file = nullptr; ASSERT_NOK(dbfull()->GetCurrentWalFile(bad_log_file)); std::unique_ptr log_file; ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); // nothing has been written to the log yet ASSERT_EQ(log_file->StartSequence(), 0); ASSERT_EQ(log_file->SizeFileBytes(), 0); ASSERT_EQ(log_file->Type(), kAliveLogFile); ASSERT_GT(log_file->LogNumber(), 0); // add some data and verify that the file size actually moves foward ASSERT_OK(Put(0, "foo", "v1")); ASSERT_OK(Put(0, "foo2", "v2")); ASSERT_OK(Put(0, "foo3", "v3")); ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); ASSERT_EQ(log_file->StartSequence(), 0); ASSERT_GT(log_file->SizeFileBytes(), 0); ASSERT_EQ(log_file->Type(), kAliveLogFile); ASSERT_GT(log_file->LogNumber(), 0); // force log files to cycle and add some more data, then check if // log number moves forward ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); for (int i = 0; i < 10; i++) { ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); } ASSERT_OK(Put(0, "foo4", "v4")); ASSERT_OK(Put(0, "foo5", "v5")); ASSERT_OK(Put(0, "foo6", "v6")); ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); ASSERT_EQ(log_file->StartSequence(), 0); ASSERT_GT(log_file->SizeFileBytes(), 0); ASSERT_EQ(log_file->Type(), kAliveLogFile); ASSERT_GT(log_file->LogNumber(), 0); } while (ChangeWalOptions()); } TEST_F(DBWALTest, RecoveryWithLogDataForSomeCFs) { // Test for regression of WAL cleanup missing files that don't contain data // for every column family. do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); ASSERT_OK(Put(1, "foo", "v1")); ASSERT_OK(Put(1, "foo", "v2")); uint64_t earliest_log_nums[2]; for (int i = 0; i < 2; ++i) { if (i > 0) { ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); } VectorLogPtr log_files; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); if (log_files.size() > 0) { earliest_log_nums[i] = log_files[0]->LogNumber(); } else { earliest_log_nums[i] = std::numeric_limits::max(); } } // Check at least the first WAL was cleaned up during the recovery. ASSERT_LT(earliest_log_nums[0], earliest_log_nums[1]); } while (ChangeWalOptions()); } TEST_F(DBWALTest, RecoverWithLargeLog) { do { { Options options = CurrentOptions(); CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(Put(1, "big1", std::string(200000, '1'))); ASSERT_OK(Put(1, "big2", std::string(200000, '2'))); ASSERT_OK(Put(1, "small3", std::string(10, '3'))); ASSERT_OK(Put(1, "small4", std::string(10, '4'))); ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); } // Make sure that if we re-open with a small write buffer size that // we flush table files in the middle of a large log file. Options options; options.write_buffer_size = 100000; options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3); ASSERT_EQ(std::string(200000, '1'), Get(1, "big1")); ASSERT_EQ(std::string(200000, '2'), Get(1, "big2")); ASSERT_EQ(std::string(10, '3'), Get(1, "small3")); ASSERT_EQ(std::string(10, '4'), Get(1, "small4")); ASSERT_GT(NumTableFilesAtLevel(0, 1), 1); } while (ChangeWalOptions()); } // In https://reviews.facebook.net/D20661 we change // recovery behavior: previously for each log file each column family // memtable was flushed, even it was empty. Now it's changed: // we try to create the smallest number of table files by merging // updates from multiple logs TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) { Options options = CurrentOptions(); options.write_buffer_size = 5000000; CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); // Since we will reopen DB with smaller write_buffer_size, // each key will go to new SST file ASSERT_OK(Put(1, Key(10), DummyString(1000000))); ASSERT_OK(Put(1, Key(10), DummyString(1000000))); ASSERT_OK(Put(1, Key(10), DummyString(1000000))); ASSERT_OK(Put(1, Key(10), DummyString(1000000))); ASSERT_OK(Put(3, Key(10), DummyString(1))); // Make 'dobrynia' to be flushed and new WAL file to be created ASSERT_OK(Put(2, Key(10), DummyString(7500000))); ASSERT_OK(Put(2, Key(1), DummyString(1))); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2])); { auto tables = ListTableFiles(env_, dbname_); ASSERT_EQ(tables.size(), static_cast(1)); // Make sure 'dobrynia' was flushed: check sst files amount ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(1)); } // New WAL file ASSERT_OK(Put(1, Key(1), DummyString(1))); ASSERT_OK(Put(1, Key(1), DummyString(1))); ASSERT_OK(Put(3, Key(10), DummyString(1))); ASSERT_OK(Put(3, Key(10), DummyString(1))); ASSERT_OK(Put(3, Key(10), DummyString(1))); options.write_buffer_size = 4096; options.arena_block_size = 4096; ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, options); { // No inserts => default is empty ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(0)); // First 4 keys goes to separate SSTs + 1 more SST for 2 smaller keys ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(5)); // 1 SST for big key + 1 SST for small one ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(2)); // 1 SST for all keys ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(1)); } } // In https://reviews.facebook.net/D20661 we change // recovery behavior: previously for each log file each column family // memtable was flushed, even it wasn't empty. Now it's changed: // we try to create the smallest number of table files by merging // updates from multiple logs TEST_F(DBWALTest, RecoverCheckFileAmount) { Options options = CurrentOptions(); options.write_buffer_size = 100000; options.arena_block_size = 4 * 1024; options.avoid_flush_during_recovery = false; CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options); ASSERT_OK(Put(0, Key(1), DummyString(1))); ASSERT_OK(Put(1, Key(1), DummyString(1))); ASSERT_OK(Put(2, Key(1), DummyString(1))); // Make 'nikitich' memtable to be flushed ASSERT_OK(Put(3, Key(10), DummyString(1002400))); ASSERT_OK(Put(3, Key(1), DummyString(1))); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3])); // 4 memtable are not flushed, 1 sst file { auto tables = ListTableFiles(env_, dbname_); ASSERT_EQ(tables.size(), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(1)); } // Memtable for 'nikitich' has flushed, new WAL file has opened // 4 memtable still not flushed // Write to new WAL file ASSERT_OK(Put(0, Key(1), DummyString(1))); ASSERT_OK(Put(1, Key(1), DummyString(1))); ASSERT_OK(Put(2, Key(1), DummyString(1))); // Fill up 'nikitich' one more time ASSERT_OK(Put(3, Key(10), DummyString(1002400))); // make it flush ASSERT_OK(Put(3, Key(1), DummyString(1))); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3])); // There are still 4 memtable not flushed, and 2 sst tables ASSERT_OK(Put(0, Key(1), DummyString(1))); ASSERT_OK(Put(1, Key(1), DummyString(1))); ASSERT_OK(Put(2, Key(1), DummyString(1))); { auto tables = ListTableFiles(env_, dbname_); ASSERT_EQ(tables.size(), static_cast(2)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(2)); } ReopenWithColumnFamilies({"default", "pikachu", "dobrynia", "nikitich"}, options); { std::vector table_files = ListTableFiles(env_, dbname_); // Check, that records for 'default', 'dobrynia' and 'pikachu' from // first, second and third WALs went to the same SST. // So, there is 6 SSTs: three for 'nikitich', one for 'default', one for // 'dobrynia', one for 'pikachu' ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"), static_cast(3)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "dobrynia"), static_cast(1)); ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), static_cast(1)); } } TEST_F(DBWALTest, SyncMultipleLogs) { const uint64_t kNumBatches = 2; const int kBatchSize = 1000; Options options = CurrentOptions(); options.create_if_missing = true; options.write_buffer_size = 4096; Reopen(options); WriteBatch batch; WriteOptions wo; wo.sync = true; for (uint64_t b = 0; b < kNumBatches; b++) { batch.Clear(); for (int i = 0; i < kBatchSize; i++) { ASSERT_OK(batch.Put(Key(i), DummyString(128))); } ASSERT_OK(dbfull()->Write(wo, &batch)); } ASSERT_OK(dbfull()->SyncWAL()); } // Github issue 1339. Prior the fix we read sequence id from the first log to // a local variable, then keep increase the variable as we replay logs, // ignoring actual sequence id of the records. This is incorrect if some writes // come with WAL disabled. TEST_F(DBWALTest, PartOfWritesWithWALDisabled) { std::unique_ptr fault_env( new FaultInjectionTestEnv(env_)); Options options = CurrentOptions(); options.env = fault_env.get(); options.disable_auto_compactions = true; WriteOptions wal_on, wal_off; wal_on.sync = true; wal_on.disableWAL = false; wal_off.disableWAL = true; CreateAndReopenWithCF({"dummy"}, options); ASSERT_OK(Put(1, "dummy", "d1", wal_on)); // seq id 1 ASSERT_OK(Put(1, "dummy", "d2", wal_off)); ASSERT_OK(Put(1, "dummy", "d3", wal_off)); ASSERT_OK(Put(0, "key", "v4", wal_on)); // seq id 4 ASSERT_OK(Flush(0)); ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5 ASSERT_EQ("v5", Get(0, "key")); ASSERT_OK(dbfull()->FlushWAL(false)); // Simulate a crash. fault_env->SetFilesystemActive(false); Close(); fault_env->ResetState(); ReopenWithColumnFamilies({"default", "dummy"}, options); // Prior to the fix, we may incorrectly recover "v5" with sequence id = 3. ASSERT_EQ("v5", Get(0, "key")); // Destroy DB before destruct fault_env. Destroy(options); } // // Test WAL recovery for the various modes available // class RecoveryTestHelper { public: // Number of WAL files to generate static constexpr int kWALFilesCount = 10; // Starting number for the WAL file name like 00010.log static constexpr int kWALFileOffset = 10; // Keys to be written per WAL file static constexpr int kKeysPerWALFile = 133; // Size of the value static constexpr int kValueSize = 96; // Create WAL files with values filled in static void FillData(DBWALTestBase* test, const Options& options, const size_t wal_count, size_t* count) { // Calling internal functions requires sanitized options. Options sanitized_options = SanitizeOptions(test->dbname_, options); const ImmutableDBOptions db_options(sanitized_options); *count = 0; std::shared_ptr table_cache = NewLRUCache(50, 0); FileOptions file_options; WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size); std::unique_ptr versions; std::unique_ptr wal_manager; WriteController write_controller; versions.reset(new VersionSet( test->dbname_, &db_options, file_options, table_cache.get(), &write_buffer_manager, &write_controller, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", options.daily_offpeak_time_utc, /*error_handler=*/nullptr, /*read_only=*/false)); wal_manager.reset( new WalManager(db_options, file_options, /*io_tracer=*/nullptr)); std::unique_ptr current_log_writer; for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) { uint64_t current_log_number = j; std::string fname = LogFileName(test->dbname_, current_log_number); std::unique_ptr file_writer; ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(), fname, file_options, &file_writer, nullptr)); log::Writer* log_writer = new log::Writer(std::move(file_writer), current_log_number, db_options.recycle_log_file_num > 0, false, db_options.wal_compression); ASSERT_OK(log_writer->AddCompressionTypeRecord(WriteOptions())); current_log_writer.reset(log_writer); WriteBatch batch; for (int i = 0; i < kKeysPerWALFile; i++) { std::string key = "key" + std::to_string((*count)++); std::string value = test->DummyString(kValueSize); ASSERT_NE(current_log_writer.get(), nullptr); uint64_t seq = versions->LastSequence() + 1; batch.Clear(); ASSERT_OK(batch.Put(key, value)); WriteBatchInternal::SetSequence(&batch, seq); ASSERT_OK(current_log_writer->AddRecord( WriteOptions(), WriteBatchInternal::Contents(&batch))); versions->SetLastAllocatedSequence(seq); versions->SetLastPublishedSequence(seq); versions->SetLastSequence(seq); } } } // Recreate and fill the store with some data static size_t FillData(DBWALTestBase* test, Options* options) { options->create_if_missing = true; test->DestroyAndReopen(*options); test->Close(); size_t count = 0; FillData(test, *options, kWALFilesCount, &count); return count; } // Read back all the keys we wrote and return the number of keys found static size_t GetData(DBWALTestBase* test) { size_t count = 0; for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) { if (test->Get("key" + std::to_string(i)) != "NOT_FOUND") { ++count; } } return count; } // Manuall corrupt the specified WAL static void CorruptWAL(DBWALTestBase* test, const Options& options, const double off, const double len, const int wal_file_id, const bool trunc = false) { Env* env = options.env; std::string fname = LogFileName(test->dbname_, wal_file_id); uint64_t size; ASSERT_OK(env->GetFileSize(fname, &size)); ASSERT_GT(size, 0); #ifdef OS_WIN // Windows disk cache behaves differently. When we truncate // the original content is still in the cache due to the original // handle is still open. Generally, in Windows, one prohibits // shared access to files and it is not needed for WAL but we allow // it to induce corruption at various tests. test->Close(); #endif if (trunc) { ASSERT_OK( test::TruncateFile(env, fname, static_cast(size * off))); } else { ASSERT_OK(test::CorruptFile(env, fname, static_cast(size * off + 8), static_cast(size * len), false)); } } }; class DBWALTestWithParams : public DBWALTestBase, public ::testing::WithParamInterface< std::tuple> { public: DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {} }; INSTANTIATE_TEST_CASE_P( Wal, DBWALTestWithParams, ::testing::Combine(::testing::Bool(), ::testing::Range(0, 4, 1), ::testing::Range(RecoveryTestHelper::kWALFileOffset, RecoveryTestHelper::kWALFileOffset + RecoveryTestHelper::kWALFilesCount, 1), ::testing::Values(CompressionType::kNoCompression, CompressionType::kZSTD))); class DBWALTestWithParamsVaryingRecoveryMode : public DBWALTestBase, public ::testing::WithParamInterface< std::tuple> { public: DBWALTestWithParamsVaryingRecoveryMode() : DBWALTestBase("/db_wal_test_with_params_mode") {} }; INSTANTIATE_TEST_CASE_P( Wal, DBWALTestWithParamsVaryingRecoveryMode, ::testing::Combine( ::testing::Bool(), ::testing::Range(0, 4, 1), ::testing::Range(RecoveryTestHelper::kWALFileOffset, RecoveryTestHelper::kWALFileOffset + RecoveryTestHelper::kWALFilesCount, 1), ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords, WALRecoveryMode::kAbsoluteConsistency, WALRecoveryMode::kPointInTimeRecovery, WALRecoveryMode::kSkipAnyCorruptedRecords), ::testing::Values(CompressionType::kNoCompression, CompressionType::kZSTD))); // Test scope: // - We expect to open the data store when there is incomplete trailing writes // at the end of any of the logs // - We do not expect to open the data store for corruption TEST_P(DBWALTestWithParams, kTolerateCorruptedTailRecords) { bool trunc = std::get<0>(GetParam()); // Corruption style // Corruption offset position int corrupt_offset = std::get<1>(GetParam()); int wal_file_id = std::get<2>(GetParam()); // WAL file // Fill data for testing Options options = CurrentOptions(); const size_t row_count = RecoveryTestHelper::FillData(this, &options); // test checksum failure or parsing RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, /*len%=*/.1, wal_file_id, trunc); options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords; if (trunc) { options.create_if_missing = false; ASSERT_OK(TryReopen(options)); const size_t recovered_row_count = RecoveryTestHelper::GetData(this); ASSERT_TRUE(corrupt_offset == 0 || recovered_row_count > 0); ASSERT_LT(recovered_row_count, row_count); } else { ASSERT_NOK(TryReopen(options)); } } // Test scope: // We don't expect the data store to be opened if there is any corruption // (leading, middle or trailing -- incomplete writes or corruption) TEST_P(DBWALTestWithParams, kAbsoluteConsistency) { // Verify clean slate behavior Options options = CurrentOptions(); const size_t row_count = RecoveryTestHelper::FillData(this, &options); options.create_if_missing = false; ASSERT_OK(TryReopen(options)); ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count); bool trunc = std::get<0>(GetParam()); // Corruption style // Corruption offset position int corrupt_offset = std::get<1>(GetParam()); int wal_file_id = std::get<2>(GetParam()); // WAL file // WAL compression type CompressionType compression_type = std::get<3>(GetParam()); options.wal_compression = compression_type; if (trunc && corrupt_offset == 0) { return; } // fill with new date RecoveryTestHelper::FillData(this, &options); // corrupt the wal RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33, /*len%=*/.1, wal_file_id, trunc); // verify options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; options.create_if_missing = false; ASSERT_NOK(TryReopen(options)); } // Test scope: // We don't expect the data store to be opened if there is any inconsistency // between WAL and SST files TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) { Options options = CurrentOptions(); options.avoid_flush_during_recovery = true; // Create DB with multiple column families. CreateAndReopenWithCF({"one", "two"}, options); ASSERT_OK(Put(1, "key1", "val1")); ASSERT_OK(Put(2, "key2", "val2")); // Record the offset at this point Env* env = options.env; uint64_t wal_file_id = dbfull()->TEST_LogfileNumber(); std::string fname = LogFileName(dbname_, wal_file_id); uint64_t offset_to_corrupt; ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt)); ASSERT_GT(offset_to_corrupt, 0); ASSERT_OK(Put(1, "key3", "val3")); // Corrupt WAL at location of key3 ASSERT_OK(test::CorruptFile(env, fname, static_cast(offset_to_corrupt), 4, false)); ASSERT_OK(Put(2, "key4", "val4")); ASSERT_OK(Put(1, "key5", "val5")); ASSERT_OK(Flush(2)); // PIT recovery & verify options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; ASSERT_NOK(TryReopenWithColumnFamilies({"default", "one", "two"}, options)); } TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) { Options options = CurrentOptions(); options.env = env_; options.track_and_verify_wals_in_manifest = true; // The following make sure there are two bg flush threads. options.max_background_jobs = 8; DestroyAndReopen(options); const std::string cf1_name("cf1"); CreateAndReopenWithCF({cf1_name}, options); assert(handles_.size() == 2); { dbfull()->TEST_LockMutex(); ASSERT_LE(2, dbfull()->GetBGJobLimits().max_flushes); dbfull()->TEST_UnlockMutex(); } ASSERT_OK(dbfull()->PauseBackgroundWork()); ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "foo", "value")); ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); ASSERT_OK(dbfull()->TEST_FlushMemTable( /*wait=*/false, /*allow_write_stall=*/true, handles_[1])); ASSERT_OK(db_->Put(WriteOptions(), "foo", "value")); ASSERT_OK(dbfull()->TEST_FlushMemTable( /*wait=*/false, /*allow_write_stall=*/true, handles_[0])); bool called = false; std::atomic bg_flush_threads{0}; std::atomic wal_synced{false}; SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) { int cur = bg_flush_threads.load(); int desired = cur + 1; if (cur > 0 || !bg_flush_threads.compare_exchange_strong(cur, desired)) { while (!wal_synced.load()) { // Wait until the other bg flush thread finishes committing WAL sync // operation to the MANIFEST. } } }); SyncPoint::GetInstance()->SetCallBack( "DBImpl::FlushMemTableToOutputFile:CommitWal:1", [&](void* /*arg*/) { wal_synced.store(true); }); // This callback will be called when the first bg flush thread reaches the // point before entering the MANIFEST write queue after flushing the SST // file. // The purpose of the sync points here is to ensure both bg flush threads // finish computing `min_wal_number_to_keep` before any of them updates the // `log_number` for the column family that's being flushed. SyncPoint::GetInstance()->SetCallBack( "MemTableList::TryInstallMemtableFlushResults:AfterComputeMinWalToKeep", [&](void* /*arg*/) { dbfull()->mutex()->AssertHeld(); if (!called) { // We are the first bg flush thread in the MANIFEST write queue. // We set up the dependency between sync points for two threads that // will be executing the same code. // For the interleaving of events, see // https://github.com/facebook/rocksdb/pull/9715. // bg flush thread1 will release the db mutex while in the MANIFEST // write queue. In the meantime, bg flush thread2 locks db mutex and // computes the min_wal_number_to_keep (before thread1 writes to // MANIFEST thus before cf1->log_number is updated). Bg thread2 joins // the MANIFEST write queue afterwards and bg flush thread1 proceeds // with writing to MANIFEST. called = true; SyncPoint::GetInstance()->LoadDependency({ {"VersionSet::LogAndApply:WriteManifestStart", "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"}, {"DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2", "VersionSet::LogAndApply:WriteManifest"}, }); } else { // The other bg flush thread has already been in the MANIFEST write // queue, and we are after. TEST_SYNC_POINT( "DBWALTest::RaceInstallFlushResultsWithWalObsoletion:BgFlush2"); } }); SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(dbfull()->ContinueBackgroundWork()); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0])); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); ASSERT_TRUE(called); Close(); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); DB* db1 = nullptr; Status s = DB::OpenForReadOnly(options, dbname_, &db1); ASSERT_OK(s); assert(db1); delete db1; } TEST_F(DBWALTest, FixSyncWalOnObseletedWalWithNewManifestCausingMissingWAL) { Options options = CurrentOptions(); // Small size to force manifest creation options.max_manifest_file_size = 1; options.track_and_verify_wals_in_manifest = true; DestroyAndReopen(options); // Accumulate memtable m1 and create the 1st wal (i.e, 4.log) ASSERT_OK(Put(Key(1), "")); ASSERT_OK(Put(Key(2), "")); ASSERT_OK(Put(Key(3), "")); const std::string wal_file_path = db_->GetName() + "/000004.log"; // Coerce the following sequence of events: // (1) Flush() marks 4.log to be obsoleted, 8.log to be the latest (i.e, // active) log and release the lock // (2) SyncWAL() proceeds with the lock. It // creates a new manifest and syncs all the inactive wals before the latest // (i.e, active log), which is 4.log. Note that SyncWAL() is not aware of the // fact that 4.log has marked as to be obseleted. Such wal // sync will then add a WAL addition record of 4.log to the new manifest // without any special treatment. Prior to the fix, there is no WAL deletion // record to offset it. (3) BackgroundFlush() will eventually purge 4.log. bool wal_synced = false; SyncPoint::GetInstance()->SetCallBack( "FindObsoleteFiles::PostMutexUnlock", [&](void*) { ASSERT_OK(env_->FileExists(wal_file_path)); uint64_t pre_sync_wal_manifest_no = dbfull()->TEST_Current_Manifest_FileNo(); ASSERT_OK(db_->SyncWAL()); uint64_t post_sync_wal_manifest_no = dbfull()->TEST_Current_Manifest_FileNo(); bool new_manifest_created = post_sync_wal_manifest_no == pre_sync_wal_manifest_no + 1; ASSERT_TRUE(new_manifest_created); wal_synced = true; }); SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(Flush()); ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); ASSERT_TRUE(wal_synced); // BackgroundFlush() purged 4.log // because the memtable associated with the WAL was flushed and new WAL was // created (i.e, 8.log) ASSERT_TRUE(env_->FileExists(wal_file_path).IsNotFound()); SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->DisableProcessing(); // To verify the corruption of "Missing WAL with log number: 4" under // `options.track_and_verify_wals_in_manifest = true` is fixed. // // Before the fix, `db_->SyncWAL()` will sync and record WAL addtion of the // obseleted WAL 4.log in a new manifest without any special treament. // This will result in missing-wal corruption in DB::Reopen(). Status s = TryReopen(options); EXPECT_OK(s); } // Test scope: // - We expect to open data store under all circumstances // - We expect only data upto the point where the first error was encountered TEST_P(DBWALTestWithParams, kPointInTimeRecovery) { const int maxkeys = RecoveryTestHelper::kWALFilesCount * RecoveryTestHelper::kKeysPerWALFile; bool trunc = std::get<0>(GetParam()); // Corruption style // Corruption offset position int corrupt_offset = std::get<1>(GetParam()); int wal_file_id = std::get<2>(GetParam()); // WAL file // WAL compression type CompressionType compression_type = std::get<3>(GetParam()); // Fill data for testing Options options = CurrentOptions(); options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); // Corrupt the wal // The offset here was 0.3 which cuts off right at the end of a // valid fragment after wal zstd compression checksum is enabled, // so changed the value to 0.33. RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .33, /*len%=*/.1, wal_file_id, trunc); // Verify options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; options.create_if_missing = false; ASSERT_OK(TryReopen(options)); // Probe data for invariants size_t recovered_row_count = RecoveryTestHelper::GetData(this); ASSERT_LT(recovered_row_count, row_count); // Verify a prefix of keys were recovered. But not in the case of full WAL // truncation, because we have no way to know there was a corruption when // truncation happened on record boundaries (preventing recovery holes in // that case requires using `track_and_verify_wals_in_manifest`). if (!trunc || corrupt_offset != 0) { bool expect_data = true; for (size_t k = 0; k < maxkeys; ++k) { bool found = Get("key" + std::to_string(k)) != "NOT_FOUND"; if (expect_data && !found) { expect_data = false; } ASSERT_EQ(found, expect_data); } } const size_t min = RecoveryTestHelper::kKeysPerWALFile * (wal_file_id - RecoveryTestHelper::kWALFileOffset); ASSERT_GE(recovered_row_count, min); if (!trunc && corrupt_offset != 0) { const size_t max = RecoveryTestHelper::kKeysPerWALFile * (wal_file_id - RecoveryTestHelper::kWALFileOffset + 1); ASSERT_LE(recovered_row_count, max); } } // Test scope: // - We expect to open the data store under all scenarios // - We expect to have recovered records past the corruption zone TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) { bool trunc = std::get<0>(GetParam()); // Corruption style // Corruption offset position int corrupt_offset = std::get<1>(GetParam()); int wal_file_id = std::get<2>(GetParam()); // WAL file // WAL compression type CompressionType compression_type = std::get<3>(GetParam()); // Fill data for testing Options options = CurrentOptions(); options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); // Corrupt the WAL RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, /*len%=*/.1, wal_file_id, trunc); // Verify behavior options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords; options.create_if_missing = false; ASSERT_OK(TryReopen(options)); // Probe data for invariants size_t recovered_row_count = RecoveryTestHelper::GetData(this); ASSERT_LT(recovered_row_count, row_count); if (!trunc) { ASSERT_TRUE(corrupt_offset != 0 || recovered_row_count > 0); } } TEST_F(DBWALTest, AvoidFlushDuringRecovery) { Options options = CurrentOptions(); options.disable_auto_compactions = true; options.avoid_flush_during_recovery = false; // Test with flush after recovery. Reopen(options); ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("bar", "v2")); ASSERT_OK(Flush()); ASSERT_OK(Put("foo", "v3")); ASSERT_OK(Put("bar", "v4")); ASSERT_EQ(1, TotalTableFiles()); // Reopen DB. Check if WAL logs flushed. Reopen(options); ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v4", Get("bar")); ASSERT_EQ(2, TotalTableFiles()); // Test without flush after recovery. options.avoid_flush_during_recovery = true; DestroyAndReopen(options); ASSERT_OK(Put("foo", "v5")); ASSERT_OK(Put("bar", "v6")); ASSERT_OK(Flush()); ASSERT_OK(Put("foo", "v7")); ASSERT_OK(Put("bar", "v8")); ASSERT_EQ(1, TotalTableFiles()); // Reopen DB. WAL logs should not be flushed this time. Reopen(options); ASSERT_EQ("v7", Get("foo")); ASSERT_EQ("v8", Get("bar")); ASSERT_EQ(1, TotalTableFiles()); // Force flush with allow_2pc. options.avoid_flush_during_recovery = true; options.allow_2pc = true; ASSERT_OK(Put("foo", "v9")); ASSERT_OK(Put("bar", "v10")); ASSERT_OK(Flush()); ASSERT_OK(Put("foo", "v11")); ASSERT_OK(Put("bar", "v12")); Reopen(options); ASSERT_EQ("v11", Get("foo")); ASSERT_EQ("v12", Get("bar")); ASSERT_EQ(3, TotalTableFiles()); } TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) { // Verifies WAL files that were present during recovery, but not flushed due // to avoid_flush_during_recovery, will be considered for deletion at a later // stage. We check at least one such file is deleted during Flush(). Options options = CurrentOptions(); options.disable_auto_compactions = true; options.avoid_flush_during_recovery = true; Reopen(options); ASSERT_OK(Put("foo", "v1")); Reopen(options); for (int i = 0; i < 2; ++i) { if (i > 0) { // Flush() triggers deletion of obsolete tracked files ASSERT_OK(Flush()); } VectorLogPtr log_files; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files)); if (i == 0) { ASSERT_GT(log_files.size(), 0); } else { ASSERT_EQ(0, log_files.size()); } } } TEST_F(DBWALTest, RecoverWithoutFlush) { Options options = CurrentOptions(); options.avoid_flush_during_recovery = true; options.create_if_missing = false; options.disable_auto_compactions = true; options.write_buffer_size = 64 * 1024 * 1024; size_t count = RecoveryTestHelper::FillData(this, &options); auto validateData = [this, count]() { for (size_t i = 0; i < count; i++) { ASSERT_NE(Get("key" + std::to_string(i)), "NOT_FOUND"); } }; Reopen(options); validateData(); // Insert some data without flush ASSERT_OK(Put("foo", "foo_v1")); ASSERT_OK(Put("bar", "bar_v1")); Reopen(options); validateData(); ASSERT_EQ(Get("foo"), "foo_v1"); ASSERT_EQ(Get("bar"), "bar_v1"); // Insert again and reopen ASSERT_OK(Put("foo", "foo_v2")); ASSERT_OK(Put("bar", "bar_v2")); Reopen(options); validateData(); ASSERT_EQ(Get("foo"), "foo_v2"); ASSERT_EQ(Get("bar"), "bar_v2"); // manual flush and insert again ASSERT_OK(Flush()); ASSERT_EQ(Get("foo"), "foo_v2"); ASSERT_EQ(Get("bar"), "bar_v2"); ASSERT_OK(Put("foo", "foo_v3")); ASSERT_OK(Put("bar", "bar_v3")); Reopen(options); validateData(); ASSERT_EQ(Get("foo"), "foo_v3"); ASSERT_EQ(Get("bar"), "bar_v3"); } TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) { const std::string kSmallValue = "v"; const std::string kLargeValue = DummyString(1024); Options options = CurrentOptions(); options.avoid_flush_during_recovery = true; options.create_if_missing = false; options.disable_auto_compactions = true; auto countWalFiles = [this]() { VectorLogPtr log_files; if (!dbfull()->GetSortedWalFiles(log_files).ok()) { return size_t{0}; } return log_files.size(); }; // Create DB with multiple column families and multiple log files. CreateAndReopenWithCF({"one", "two"}, options); ASSERT_OK(Put(0, "key1", kSmallValue)); ASSERT_OK(Put(1, "key2", kLargeValue)); ASSERT_OK(Flush(1)); ASSERT_EQ(1, countWalFiles()); ASSERT_OK(Put(0, "key3", kSmallValue)); ASSERT_OK(Put(2, "key4", kLargeValue)); ASSERT_OK(Flush(2)); ASSERT_EQ(2, countWalFiles()); // Reopen, insert and flush. options.db_write_buffer_size = 64 * 1024 * 1024; ReopenWithColumnFamilies({"default", "one", "two"}, options); ASSERT_EQ(Get(0, "key1"), kSmallValue); ASSERT_EQ(Get(1, "key2"), kLargeValue); ASSERT_EQ(Get(0, "key3"), kSmallValue); ASSERT_EQ(Get(2, "key4"), kLargeValue); // Insert more data. ASSERT_OK(Put(0, "key5", kLargeValue)); ASSERT_OK(Put(1, "key6", kLargeValue)); ASSERT_EQ(3, countWalFiles()); ASSERT_OK(Flush(1)); ASSERT_OK(Put(2, "key7", kLargeValue)); ASSERT_OK(dbfull()->FlushWAL(false)); ASSERT_EQ(4, countWalFiles()); // Reopen twice and validate. for (int i = 0; i < 2; i++) { ReopenWithColumnFamilies({"default", "one", "two"}, options); ASSERT_EQ(Get(0, "key1"), kSmallValue); ASSERT_EQ(Get(1, "key2"), kLargeValue); ASSERT_EQ(Get(0, "key3"), kSmallValue); ASSERT_EQ(Get(2, "key4"), kLargeValue); ASSERT_EQ(Get(0, "key5"), kLargeValue); ASSERT_EQ(Get(1, "key6"), kLargeValue); ASSERT_EQ(Get(2, "key7"), kLargeValue); ASSERT_EQ(4, countWalFiles()); } } // In this test we are trying to do the following: // 1. Create a DB with corrupted WAL log; // 2. Open with avoid_flush_during_recovery = true; // 3. Append more data without flushing, which creates new WAL log. // 4. Open again. See if it can correctly handle previous corruption. TEST_P(DBWALTestWithParamsVaryingRecoveryMode, RecoverFromCorruptedWALWithoutFlush) { const int kAppendKeys = 100; Options options = CurrentOptions(); options.avoid_flush_during_recovery = true; options.create_if_missing = false; options.disable_auto_compactions = true; options.write_buffer_size = 64 * 1024 * 1024; auto getAll = [this]() { std::vector> data; ReadOptions ropt; Iterator* iter = dbfull()->NewIterator(ropt); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { data.emplace_back(iter->key().ToString(), iter->value().ToString()); } EXPECT_OK(iter->status()); delete iter; return data; }; bool trunc = std::get<0>(GetParam()); // Corruption style // Corruption offset position int corrupt_offset = std::get<1>(GetParam()); int wal_file_id = std::get<2>(GetParam()); // WAL file WALRecoveryMode recovery_mode = std::get<3>(GetParam()); // WAL compression type CompressionType compression_type = std::get<4>(GetParam()); options.wal_recovery_mode = recovery_mode; options.wal_compression = compression_type; // Create corrupted WAL RecoveryTestHelper::FillData(this, &options); RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, /*len%=*/.1, wal_file_id, trunc); // Skip the test if DB won't open. if (!TryReopen(options).ok()) { ASSERT_TRUE(options.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || (!trunc && options.wal_recovery_mode == WALRecoveryMode::kTolerateCorruptedTailRecords)); return; } ASSERT_OK(TryReopen(options)); // Append some more data. for (int k = 0; k < kAppendKeys; k++) { std::string key = "extra_key" + std::to_string(k); std::string value = DummyString(RecoveryTestHelper::kValueSize); ASSERT_OK(Put(key, value)); } // Save data for comparison. auto data = getAll(); // Reopen. Verify data. ASSERT_OK(TryReopen(options)); auto actual_data = getAll(); ASSERT_EQ(data, actual_data); } // Tests that total log size is recovered if we set // avoid_flush_during_recovery=true. // Flush should trigger if max_total_wal_size is reached. TEST_F(DBWALTest, RestoreTotalLogSizeAfterRecoverWithoutFlush) { auto test_listener = std::make_shared(); test_listener->expected_flush_reason = FlushReason::kWalFull; constexpr size_t kKB = 1024; constexpr size_t kMB = 1024 * 1024; Options options = CurrentOptions(); options.avoid_flush_during_recovery = true; options.max_total_wal_size = 1 * kMB; options.listeners.push_back(test_listener); // Have to open DB in multi-CF mode to trigger flush when // max_total_wal_size is reached. CreateAndReopenWithCF({"one"}, options); // Write some keys and we will end up with one log file which is slightly // smaller than 1MB. std::string value_100k(100 * kKB, 'v'); std::string value_300k(300 * kKB, 'v'); ASSERT_OK(Put(0, "foo", "v1")); for (int i = 0; i < 9; i++) { ASSERT_OK(Put(1, "key" + std::to_string(i), value_100k)); } // Get log files before reopen. VectorLogPtr log_files_before; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); ASSERT_EQ(1, log_files_before.size()); uint64_t log_size_before = log_files_before[0]->SizeFileBytes(); ASSERT_GT(log_size_before, 900 * kKB); ASSERT_LT(log_size_before, 1 * kMB); ReopenWithColumnFamilies({"default", "one"}, options); // Write one more value to make log larger than 1MB. ASSERT_OK(Put(1, "bar", value_300k)); // Get log files again. A new log file will be opened. VectorLogPtr log_files_after_reopen; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after_reopen)); ASSERT_EQ(2, log_files_after_reopen.size()); ASSERT_EQ(log_files_before[0]->LogNumber(), log_files_after_reopen[0]->LogNumber()); ASSERT_GT(log_files_after_reopen[0]->SizeFileBytes() + log_files_after_reopen[1]->SizeFileBytes(), 1 * kMB); // Write one more key to trigger flush. ASSERT_OK(Put(0, "foo", "v2")); for (auto* h : handles_) { ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(h)); } // Flushed two column families. ASSERT_EQ(2, test_listener->count.load()); } #if defined(ROCKSDB_PLATFORM_POSIX) #if defined(ROCKSDB_FALLOCATE_PRESENT) // Tests that we will truncate the preallocated space of the last log from // previous. TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithoutFlush) { constexpr size_t kKB = 1024; Options options = CurrentOptions(); options.env = env_; options.avoid_flush_during_recovery = true; if (mem_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem environment"); return; } if (!IsFallocateSupported()) { return; } DestroyAndReopen(options); size_t preallocated_size = dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size); ASSERT_OK(Put("foo", "v1")); VectorLogPtr log_files_before; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); ASSERT_EQ(1, log_files_before.size()); auto& file_before = log_files_before[0]; ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB); // The log file has preallocated space. ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()), preallocated_size); Reopen(options); VectorLogPtr log_files_after; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after)); ASSERT_EQ(1, log_files_after.size()); ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB); // The preallocated space should be truncated. ASSERT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()), preallocated_size); } // Tests that we will truncate the preallocated space of the last log from // previous. TEST_F(DBWALTest, TruncateLastLogAfterRecoverWithFlush) { constexpr size_t kKB = 1024; Options options = CurrentOptions(); options.env = env_; options.avoid_flush_during_recovery = false; options.avoid_flush_during_shutdown = true; if (mem_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem environment"); return; } if (!IsFallocateSupported()) { return; } DestroyAndReopen(options); size_t preallocated_size = dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size); ASSERT_OK(Put("foo", "v1")); VectorLogPtr log_files_before; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); ASSERT_EQ(1, log_files_before.size()); auto& file_before = log_files_before[0]; ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB); ASSERT_GE(GetAllocatedFileSize(dbname_ + file_before->PathName()), preallocated_size); // The log file has preallocated space. Close(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::PurgeObsoleteFiles:Begin", "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"}, {"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate", "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}}); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); port::Thread reopen_thread([&]() { Reopen(options); }); TEST_SYNC_POINT( "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"); // After the flush during Open, the log file should get deleted. However, // if the process is in a crash loop, the log file may not get // deleted and thte preallocated space will keep accumulating. So we need // to ensure it gets trtuncated. EXPECT_LT(GetAllocatedFileSize(dbname_ + file_before->PathName()), preallocated_size); TEST_SYNC_POINT( "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate"); reopen_thread.join(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } TEST_F(DBWALTest, TruncateLastLogAfterRecoverWALEmpty) { Options options = CurrentOptions(); options.env = env_; options.avoid_flush_during_recovery = false; if (mem_env_ || encrypted_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem/non-encrypted environment"); return; } if (!IsFallocateSupported()) { return; } DestroyAndReopen(options); size_t preallocated_size = dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size); Close(); std::vector filenames; std::string last_log; uint64_t last_log_num = 0; ASSERT_OK(env_->GetChildren(dbname_, &filenames)); for (const auto& fname : filenames) { uint64_t number; FileType type; if (ParseFileName(fname, &number, &type, nullptr)) { if (type == kWalFile && number > last_log_num) { last_log = fname; } } } ASSERT_NE(last_log, ""); last_log = dbname_ + '/' + last_log; ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"DBImpl::PurgeObsoleteFiles:Begin", "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"}, {"DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate", "DBImpl::DeleteObsoleteFileImpl::BeforeDeletion"}}); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "PosixWritableFile::Close", [](void* arg) { *(static_cast(arg)) = 0; }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); // Preallocate space for the empty log file. This could happen if WAL data // was buffered in memory and the process crashed. std::unique_ptr log_file; ASSERT_OK(env_->ReopenWritableFile(last_log, &log_file, EnvOptions())); log_file->SetPreallocationBlockSize(preallocated_size); log_file->PrepareWrite(0, 4096); log_file.reset(); ASSERT_GE(GetAllocatedFileSize(last_log), preallocated_size); port::Thread reopen_thread([&]() { Reopen(options); }); TEST_SYNC_POINT( "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterRecover"); // The preallocated space should be truncated. EXPECT_LT(GetAllocatedFileSize(last_log), preallocated_size); TEST_SYNC_POINT( "DBWALTest::TruncateLastLogAfterRecoverWithFlush:AfterTruncate"); reopen_thread.join(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); } TEST_F(DBWALTest, ReadOnlyRecoveryNoTruncate) { constexpr size_t kKB = 1024; Options options = CurrentOptions(); options.env = env_; options.avoid_flush_during_recovery = true; if (mem_env_) { ROCKSDB_GTEST_SKIP("Test requires non-mem environment"); return; } if (!IsFallocateSupported()) { return; } // create DB and close with file truncate disabled std::atomic_bool enable_truncate{false}; SyncPoint::GetInstance()->SetCallBack( "PosixWritableFile::Close", [&](void* arg) { if (!enable_truncate) { *(static_cast(arg)) = 0; } }); SyncPoint::GetInstance()->EnableProcessing(); DestroyAndReopen(options); size_t preallocated_size = dbfull()->TEST_GetWalPreallocateBlockSize(options.write_buffer_size); ASSERT_OK(Put("foo", "v1")); VectorLogPtr log_files_before; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_before)); ASSERT_EQ(1, log_files_before.size()); auto& file_before = log_files_before[0]; ASSERT_LT(file_before->SizeFileBytes(), 1 * kKB); // The log file has preallocated space. auto db_size = GetAllocatedFileSize(dbname_ + file_before->PathName()); ASSERT_GE(db_size, preallocated_size); Close(); // enable truncate and open DB as readonly, the file should not be truncated // and DB size is not changed. enable_truncate = true; ASSERT_OK(ReadOnlyReopen(options)); VectorLogPtr log_files_after; ASSERT_OK(dbfull()->GetSortedWalFiles(log_files_after)); ASSERT_EQ(1, log_files_after.size()); ASSERT_LT(log_files_after[0]->SizeFileBytes(), 1 * kKB); ASSERT_EQ(log_files_after[0]->PathName(), file_before->PathName()); // The preallocated space should NOT be truncated. // the DB size is almost the same. ASSERT_NEAR(GetAllocatedFileSize(dbname_ + file_before->PathName()), db_size, db_size / 100); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } #endif // ROCKSDB_FALLOCATE_PRESENT #endif // ROCKSDB_PLATFORM_POSIX TEST_F(DBWALTest, WalInManifestButNotInSortedWals) { Options options = CurrentOptions(); options.track_and_verify_wals_in_manifest = true; options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; // Build a way to make wal files selectively go missing bool wals_go_missing = false; struct MissingWalFs : public FileSystemWrapper { MissingWalFs(const std::shared_ptr& t, bool* _wals_go_missing_flag) : FileSystemWrapper(t), wals_go_missing_flag(_wals_go_missing_flag) {} bool* wals_go_missing_flag; IOStatus GetChildren(const std::string& dir, const IOOptions& io_opts, std::vector* r, IODebugContext* dbg) override { IOStatus s = target_->GetChildren(dir, io_opts, r, dbg); if (s.ok() && *wals_go_missing_flag) { for (size_t i = 0; i < r->size();) { if (EndsWith(r->at(i), ".log")) { r->erase(r->begin() + i); } else { ++i; } } } return s; } const char* Name() const override { return "MissingWalFs"; } }; auto my_fs = std::make_shared(env_->GetFileSystem(), &wals_go_missing); std::unique_ptr my_env(NewCompositeEnv(my_fs)); options.env = my_env.get(); CreateAndReopenWithCF({"blah"}, options); // Currently necessary to get a WAL tracked in manifest; see // https://github.com/facebook/rocksdb/issues/10080 ASSERT_OK(Put(0, "x", "y")); ASSERT_OK(db_->SyncWAL()); ASSERT_OK(Put(1, "x", "y")); ASSERT_OK(db_->SyncWAL()); ASSERT_OK(Flush(1)); ASSERT_FALSE(dbfull()->GetVersionSet()->GetWalSet().GetWals().empty()); std::vector> wals; ASSERT_OK(db_->GetSortedWalFiles(wals)); wals_go_missing = true; ASSERT_NOK(db_->GetSortedWalFiles(wals)); wals_go_missing = false; Close(); } TEST_F(DBWALTest, WalTermTest) { Options options = CurrentOptions(); options.env = env_; CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(Put(1, "foo", "bar")); WriteOptions wo; wo.sync = true; wo.disableWAL = false; WriteBatch batch; ASSERT_OK(batch.Put("foo", "bar")); batch.MarkWalTerminationPoint(); ASSERT_OK(batch.Put("foo2", "bar2")); ASSERT_OK(dbfull()->Write(wo, &batch)); // make sure we can re-open it. ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); ASSERT_EQ("bar", Get(1, "foo")); ASSERT_EQ("NOT_FOUND", Get(1, "foo2")); } TEST_F(DBWALTest, GetCompressedWalsAfterSync) { if (db_->GetOptions().wal_compression == kNoCompression) { ROCKSDB_GTEST_BYPASS("stream compression not present"); return; } Options options = GetDefaultOptions(); options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; options.create_if_missing = true; options.env = env_; options.avoid_flush_during_recovery = true; options.track_and_verify_wals_in_manifest = true; // Enable WAL compression so that the newly-created WAL will be non-empty // after DB open, even if point-in-time WAL recovery encounters no // corruption. options.wal_compression = kZSTD; DestroyAndReopen(options); // Write something to memtable and WAL so that log_empty_ will be false after // next DB::Open(). ASSERT_OK(Put("a", "v")); Reopen(options); // New WAL is created, thanks to !log_empty_. ASSERT_OK(dbfull()->TEST_SwitchWAL()); ASSERT_OK(Put("b", "v")); ASSERT_OK(db_->SyncWAL()); VectorLogPtr wals; Status s = dbfull()->GetSortedWalFiles(wals); ASSERT_OK(s); } TEST_F(DBWALTest, EmptyWalReopenTest) { Options options = CurrentOptions(); options.env = env_; CreateAndReopenWithCF({"pikachu"}, options); // make sure we can re-open it. ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options)); { std::vector files; int num_wal_files = 0; ASSERT_OK(env_->GetChildren(dbname_, &files)); for (const auto& file : files) { uint64_t number = 0; FileType type = kWalFile; if (ParseFileName(file, &number, &type) && type == kWalFile) { num_wal_files++; } } ASSERT_EQ(num_wal_files, 1); } } } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }