From 55f7ded80d4418641b6174f23eae7d0d2be091b9 Mon Sep 17 00:00:00 2001 From: Merlin Mao Date: Fri, 23 Jul 2021 11:11:25 -0700 Subject: [PATCH] Checkpoint dir options fix (#8572) Summary: Originally the 2 options `db_log_dir` and `wal_dir` will be reused in a snapshot db since the options files are just copied. By default, if `wal_dir` was not set when a db was created, it is set to the db's dir. Therefore, the snapshot db will use the same WAL dir. If both the original db and the snapshot db write to or delete from the WAL dir, one may modify or delete files which belong to the other. The same applies to `db_log_dir` as well, but as info log files are not copied or linked, it is simpler for this option. 2 arguments are added to `Checkpoint::CreateCheckpoint()`, allowing to override these 2 options. `wal_dir`: If the function argument `wal_dir` is empty, or set to the original db location, or the checkpoint location, the snapshot's `wal_dir` option will be updated to the checkpoint location. Otherwise, the absolute path specified in the argument will be used. During checkpointing, live WAL files will be copied or linked the new location, instead of the current WAL dir specified in the original db. `db_log_dir`: Same as `wal_dir`, but no files will be copied or linked. A new unit test was added: `CheckpointTest.CheckpointWithOptionsDirsTest`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8572 Test Plan: New unit test ``` checkpoint_test --gtest_filter="CheckpointTest.CheckpointWithOptionsDirsTest" ``` Output ``` Note: Google Test filter = CheckpointTest.CheckpointWithOptionsDirsTest [==========] Running 1 test from 1 test case. [----------] Global test environment set-up. [----------] 1 test from CheckpointTest [ RUN ] CheckpointTest.CheckpointWithOptionsDirsTest [ OK ] CheckpointTest.CheckpointWithOptionsDirsTest (11712 ms) [----------] 1 test from CheckpointTest (11712 ms total) [----------] Global test environment tear-down [==========] 1 test from 1 test case ran. (11713 ms total) [ PASSED ] 1 test. ``` This test will fail without this patch. Just modify the code to remove the 2 arguments introduced in this patch in `CreateCheckpoint()`. Reviewed By: zhichao-cao Differential Revision: D29832761 Pulled By: autopear fbshipit-source-id: e6a639b4d674380df82998c0839e79cab695fe29 --- include/rocksdb/utilities/checkpoint.h | 6 +- utilities/checkpoint/checkpoint_impl.cc | 124 +++++++++++++++++--- utilities/checkpoint/checkpoint_impl.h | 9 +- utilities/checkpoint/checkpoint_test.cc | 149 ++++++++++++++++++++++++ 4 files changed, 268 insertions(+), 20 deletions(-) diff --git a/include/rocksdb/utilities/checkpoint.h b/include/rocksdb/utilities/checkpoint.h index df2a744033..f4f1813ce8 100644 --- a/include/rocksdb/utilities/checkpoint.h +++ b/include/rocksdb/utilities/checkpoint.h @@ -40,9 +40,13 @@ class Checkpoint { // sequence_number_ptr: if it is not nullptr, the value it points to will be // set to the DB's sequence number. The default value of this parameter is // nullptr. + // db_log_dir / wal_dir: override db_log_dir or wal_dir option in the + // snapshot. If empty, checkpoint_dir will be used. virtual Status CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush = 0, - uint64_t* sequence_number_ptr = nullptr); + uint64_t* sequence_number_ptr = nullptr, + const std::string& db_log_dir = "", + const std::string& wal_dir = ""); // Exports all live SST files of a specified Column Family onto export_dir, // returning SST files information in metadata. diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index f79b0f2ae1..e7351654ae 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -20,12 +20,14 @@ #include "db/wal_manager.h" #include "file/file_util.h" #include "file/filename.h" +#include "options/options_parser.h" #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/metadata.h" #include "rocksdb/transaction_log.h" #include "rocksdb/utilities/checkpoint.h" +#include "rocksdb/utilities/options_util.h" #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/file_checksum_helper.h" @@ -39,7 +41,9 @@ Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) { Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/, uint64_t /*log_size_for_flush*/, - uint64_t* /*sequence_number_ptr*/) { + uint64_t* /*sequence_number_ptr*/, + const std::string& /*db_log_dir*/, + const std::string& /*wal_dir*/) { return Status::NotSupported(""); } @@ -76,7 +80,9 @@ Status Checkpoint::ExportColumnFamily( // Builds an openable snapshot of RocksDB Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush, - uint64_t* sequence_number_ptr) { + uint64_t* sequence_number_ptr, + const std::string& db_log_dir, + const std::string& wal_dir) { DBOptions db_options = db_->GetDBOptions(); Status s = db_->GetEnv()->FileExists(checkpoint_dir); @@ -101,15 +107,54 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, return Status::InvalidArgument("invalid checkpoint directory name"); } - std::string full_private_path = - checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp"; - ROCKS_LOG_INFO( - db_options.info_log, - "Snapshot process -- using temporary directory %s", - full_private_path.c_str()); + std::string parsed_checkpoint_dir = + checkpoint_dir.substr(0, final_nonslash_idx + 1); + std::string full_private_path = parsed_checkpoint_dir + ".tmp"; + ROCKS_LOG_INFO(db_options.info_log, + "Snapshot process -- using temporary directory %s", + full_private_path.c_str()); CleanStagingDirectory(full_private_path, db_options.info_log.get()); // create snapshot directory s = db_->GetEnv()->CreateDir(full_private_path); + + // Remove the last `/`s if needed + std::string parsed_log_dir = + db_log_dir.empty() + ? "" + : db_log_dir.substr(0, db_log_dir.find_last_not_of('/') + 1); + std::string parsed_wal_dir = + wal_dir.empty() ? "" + : wal_dir.substr(0, wal_dir.find_last_not_of('/') + 1); + + // Info log files are not copied or linked, just update the option value. + std::string value_log_dir = parsed_log_dir == db_->GetName() || + parsed_log_dir == parsed_checkpoint_dir + ? "" + : parsed_log_dir; + + // If the wal_dir is empty, or the same as the source db dir, update the + // option value to the checkpoint dir. + std::string value_wal_dir; // Option value to override + std::string new_wal_dir; // The target location to copy/link WAL files + if (parsed_wal_dir.empty() || parsed_wal_dir == db_->GetName() || + parsed_wal_dir == parsed_checkpoint_dir) { + value_wal_dir = parsed_checkpoint_dir; + new_wal_dir = full_private_path; // Copy to the temp dir + } else { + value_wal_dir = parsed_wal_dir; + std::string prefix = parsed_checkpoint_dir + "/"; + // If checkpoint_dir is parent of wal_dir, create the wal dir inside the tmp + // dir; otherwise, create it directly. + new_wal_dir = + parsed_wal_dir.rfind(prefix, 0) == 0 + ? full_private_path + "/" + parsed_wal_dir.substr(prefix.size()) + : parsed_wal_dir; + s = db_->GetEnv()->FileExists(new_wal_dir); + if (s.IsNotFound()) { + s = db_->GetEnv()->CreateDir(new_wal_dir); + } + } + uint64_t sequence_number = 0; if (s.ok()) { // enable file deletions @@ -120,21 +165,32 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, s = CreateCustomCheckpoint( db_options, [&](const std::string& src_dirname, const std::string& fname, - FileType) { + FileType type) { ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str()); - return db_->GetFileSystem()->LinkFile(src_dirname + fname, - full_private_path + fname, - IOOptions(), nullptr); + // WAL file links may be created in another location. + return db_->GetFileSystem()->LinkFile( + src_dirname + fname, + (type == kWalFile ? new_wal_dir : full_private_path) + fname, + IOOptions(), nullptr); } /* link_file_cb */, [&](const std::string& src_dirname, const std::string& fname, - uint64_t size_limit_bytes, FileType, + uint64_t size_limit_bytes, FileType type, const std::string& /* checksum_func_name */, const std::string& /* checksum_val */) { ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str()); - return CopyFile(db_->GetFileSystem(), src_dirname + fname, - full_private_path + fname, size_limit_bytes, - db_options.use_fsync); + if (type == kOptionsFile) { + // Modify and rewrite option files + return CopyOptionsFile(src_dirname + fname, + full_private_path + fname, value_log_dir, + value_wal_dir); + } else { + // Copy other files. WAL files may be copied to another location. + return Status(CopyFile( + db_->GetFileSystem(), src_dirname + fname, + (type == kWalFile ? new_wal_dir : full_private_path) + fname, + size_limit_bytes, db_options.use_fsync)); + } } /* copy_file_cb */, [&](const std::string& fname, const std::string& contents, FileType) { ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str()); @@ -154,11 +210,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, if (s.ok()) { // move tmp private backup to real snapshot directory - s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir); + s = db_->GetEnv()->RenameFile(full_private_path, parsed_checkpoint_dir); } if (s.ok()) { std::unique_ptr checkpoint_directory; - s = db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory); + s = db_->GetEnv()->NewDirectory(parsed_checkpoint_dir, + &checkpoint_directory); if (s.ok() && checkpoint_directory != nullptr) { s = checkpoint_directory->Fsync(); } @@ -588,6 +645,37 @@ Status CheckpointImpl::ExportFilesInMetaData( return s; } + +Status CheckpointImpl::CopyOptionsFile(const std::string& src_file, + const std::string& target_file, + const std::string& db_log_dir, + const std::string& wal_dir) { + Status s; + DBOptions src_db_options; + std::vector src_cf_descs; + s = LoadOptionsFromFile(ConfigOptions(), src_file, &src_db_options, + &src_cf_descs); + if (!s.ok()) { + return s; + } + + // Override these 2 options + src_db_options.db_log_dir = db_log_dir; + src_db_options.wal_dir = wal_dir; + + std::vector src_cf_names; + std::vector src_cf_opts; + src_cf_names.reserve(src_cf_descs.size()); + src_cf_opts.reserve(src_cf_descs.size()); + for (ColumnFamilyDescriptor desc : src_cf_descs) { + src_cf_names.push_back(desc.name); + src_cf_opts.push_back(desc.options); + } + + return PersistRocksDBOptions(src_db_options, src_cf_names, src_cf_opts, + target_file, db_->GetFileSystem()); +} + } // namespace ROCKSDB_NAMESPACE #endif // ROCKSDB_LITE diff --git a/utilities/checkpoint/checkpoint_impl.h b/utilities/checkpoint/checkpoint_impl.h index ff9b8f4556..d4aa3a382b 100644 --- a/utilities/checkpoint/checkpoint_impl.h +++ b/utilities/checkpoint/checkpoint_impl.h @@ -20,7 +20,9 @@ class CheckpointImpl : public Checkpoint { Status CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush, - uint64_t* sequence_number_ptr) override; + uint64_t* sequence_number_ptr, + const std::string& db_log_dir, + const std::string& wal_dir) override; Status ExportColumnFamily(ColumnFamilyHandle* handle, const std::string& export_dir, @@ -57,6 +59,11 @@ class CheckpointImpl : public Checkpoint { const std::string& fname)> copy_file_cb); + Status CopyOptionsFile(const std::string& src_file, + const std::string& target_file, + const std::string& db_log_dir, + const std::string& wal_dir); + private: DB* db_; }; diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index 82afe9fcea..17ab9606c5 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -14,16 +14,21 @@ #ifndef OS_WIN #include #endif +#include #include +#include #include #include +#include #include "db/db_impl/db_impl.h" #include "file/file_util.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/convenience.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/utilities/options_util.h" #include "rocksdb/utilities/transaction_db.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" @@ -259,6 +264,12 @@ class CheckpointTest : public testing::Test { } return result; } + + static std::string IntToFixedWidthString(size_t i, int len) { + std::stringstream ss; + ss << std::setw(len) << std::setfill('0') << i; + return ss.str(); + } }; TEST_F(CheckpointTest, GetSnapshotLink) { @@ -902,6 +913,144 @@ TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) { delete snapshot_db; } +TEST_F(CheckpointTest, CheckpointWithOptionsDirsTest) { + // If the checkpoint and the source db share the same wal_dir, files may be + // corrupted if both write to or delete from the same wal_dir. db_log_dir + // should also be updated during checkpointing, but it is less important since + // log files are not copied or linked to the checkpoint. + + // 8 bytes key, 1 kB record, 4 kB MemTable. Each batch should trigger 25 + // flushes + const int key_len = 8; + const int value_len = 1016; + const size_t num_keys = 100; + const size_t buffer_size = 4096; + + std::string value(value_len, ' '); + + std::vector dirs1 = {"", "", "", ""}; + std::vector dirs2 = {"/logs", "/wal", "", ""}; + std::vector dirs3 = {"/logs", "/wal", "/logs", "/wal"}; + + for (auto dirs : {dirs1, dirs2, dirs3}) { + std::string src_log_dir = dirs[0].empty() ? "" : dbname_ + dirs[0]; + std::string src_wal_dir = dirs[1].empty() ? "" : dbname_ + dirs[1]; + std::string snap_log_dir = dirs[2].empty() ? "" : snapshot_name_ + dirs[2]; + std::string snap_wal_dir = dirs[3].empty() ? "" : snapshot_name_ + dirs[3]; + + Options src_opts = CurrentOptions(); + WriteOptions w_opts; + ReadOptions r_opts; + DB* snapshotDB; + Checkpoint* checkpoint; + + src_opts = CurrentOptions(); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, src_opts)); + + // Create a database + src_opts.create_if_missing = true; + src_opts.write_buffer_size = buffer_size; + src_opts.OptimizeUniversalStyleCompaction(buffer_size); + src_opts.db_log_dir = src_log_dir; + src_opts.wal_dir = src_wal_dir; + + ASSERT_OK(DB::Open(src_opts, dbname_, &db_)); + + // Write to src db + for (size_t i = 1; i <= num_keys; i++) { + ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value)); + } + + // Take a snapshot + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 0, nullptr, + snap_log_dir, snap_wal_dir)); + + // Write to src db again + for (size_t i = num_keys + 1; i <= num_keys * 2; i++) { + ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value)); + } + + std::string result; + std::string key1 = IntToFixedWidthString(num_keys, key_len); + std::string key2 = IntToFixedWidthString(num_keys * 2, key_len); + std::string key3 = IntToFixedWidthString(num_keys * 3, key_len); + + ASSERT_OK(db_->Get(r_opts, key1, &result)); + ASSERT_OK(db_->Get(r_opts, key2, &result)); + + // Open snapshot with its own options + DBOptions snap_opts; + std::vector snap_cfs; + ASSERT_OK(LoadLatestOptions(ConfigOptions(), snapshot_name_, &snap_opts, + &snap_cfs)); + + ASSERT_EQ(snap_opts.db_log_dir, snap_log_dir); + ASSERT_EQ(snap_opts.wal_dir, + snap_wal_dir.empty() ? snapshot_name_ : snap_wal_dir); + + std::vector handles; + ASSERT_OK( + DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB)); + for (ColumnFamilyHandle* handle : handles) { + delete handle; + } + handles.clear(); + + ASSERT_OK(snapshotDB->Get(r_opts, key1, &result)); + ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound()); + + // Write to snapshot + for (size_t i = num_keys * 2 + 1; i <= num_keys * 3; i++) { + ASSERT_OK( + snapshotDB->Put(w_opts, IntToFixedWidthString(i, key_len), value)); + } + + ASSERT_OK(snapshotDB->Get(r_opts, key3, &result)); + ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound()); + + // Close and reopen the snapshot + delete snapshotDB; + ASSERT_OK( + DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB)); + for (ColumnFamilyHandle* handle : handles) { + delete handle; + } + handles.clear(); + ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound()); + ASSERT_OK(snapshotDB->Get(r_opts, key3, &result)); + + delete snapshotDB; + + // Close and reopen the source db + delete db_; + src_opts.create_if_missing = false; + ASSERT_OK(DB::Open(src_opts, dbname_, &db_)); + ASSERT_OK(db_->Get(r_opts, key2, &result)); + ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound()); + delete db_; + + // Delete the snapshot + Options del_opts; + del_opts.db_log_dir = snap_opts.db_log_dir; + del_opts.wal_dir = snap_opts.wal_dir; + ASSERT_OK(DestroyDB(snapshot_name_, del_opts)); + + // Reopen the source db again + ASSERT_OK(DB::Open(src_opts, dbname_, &db_)); + + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, src_opts)); + + dbname_ = test::PerThreadDBPath(env_, "db_test"); + + delete checkpoint; + } +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) {