Try to repair db with wal_dir option, avoid leak some WAL files

Summary:
We should search wal_dir in Repairer::FindFiles function, and avoid use
LogFileNmae(dbname, number) to get WAL file's name, which will get a wrong
WAL filename. as following:

```
[WARN] [/home/liuchang/Workspace/rocksdb/db/repair.cc:310] Log #3: ignoring conversion error: IO error: While opening a file for sequentially reading: /tmp/rocksdbtest-1000/repair_test/000003.log: No such file or directory
```
  I have added a new test case to repair_test.cc, which try to repair db with all WAL options.

Signed-off-by: Chang Liu <liuchang0812@gmail.com>
Closes https://github.com/facebook/rocksdb/pull/2692

Differential Revision: D5575888

Pulled By: ajkr

fbshipit-source-id: 5b93e9f85cddc01663ccecd87631fa723ac466a3
This commit is contained in:
Chang Liu 2017-08-08 10:42:38 -07:00 committed by Facebook Github Bot
parent 36375de76f
commit d97a72d63f
3 changed files with 52 additions and 6 deletions

View file

@ -2298,7 +2298,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
// Delete log files in the WAL dir
for (const auto& file : walDirFiles) {
if (ParseFileName(file, &number, &type) && type == kLogFile) {
Status del = env->DeleteFile(soptions.wal_dir + "/" + file);
Status del = env->DeleteFile(LogFileName(soptions.wal_dir, number));
if (result.ok() && !del.ok()) {
result = del;
}

View file

@ -176,6 +176,7 @@ class Repairer {
status = db_impl->NewDB();
delete db_impl;
}
if (status.ok()) {
// Recover using the fresh manifest created by NewDB()
status =
@ -246,9 +247,21 @@ class Repairer {
Status FindFiles() {
std::vector<std::string> filenames;
bool found_file = false;
std::vector<std::string> to_search_paths;
for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) {
to_search_paths.push_back(db_options_.db_paths[path_id].path);
}
// search wal_dir if user uses a customize wal_dir
if (!db_options_.wal_dir.empty() &&
db_options_.wal_dir != dbname_) {
to_search_paths.push_back(db_options_.wal_dir);
}
for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
Status status =
env_->GetChildren(db_options_.db_paths[path_id].path, &filenames);
env_->GetChildren(to_search_paths[path_id], &filenames);
if (!status.ok()) {
return status;
}
@ -261,14 +274,12 @@ class Repairer {
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
if (type == kDescriptorFile) {
assert(path_id == 0);
manifests_.push_back(filenames[i]);
} else {
if (number + 1 > next_file_number_) {
next_file_number_ = number + 1;
}
if (type == kLogFile) {
assert(path_id == 0);
logs_.push_back(number);
} else if (type == kTableFile) {
table_fds_.emplace_back(number, static_cast<uint32_t>(path_id),
@ -288,7 +299,8 @@ class Repairer {
void ConvertLogFilesToTables() {
for (size_t i = 0; i < logs_.size(); i++) {
std::string logname = LogFileName(dbname_, logs_[i]);
// we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option.
std::string logname = LogFileName(db_options_.wal_dir, logs_[i]);
Status status = ConvertLogToTable(logs_[i]);
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
@ -312,7 +324,7 @@ class Repairer {
};
// Open the log file
std::string logname = LogFileName(dbname_, log);
std::string logname = LogFileName(db_options_.wal_dir, log);
unique_ptr<SequentialFile> lfile;
Status status = env_->NewSequentialFile(
logname, &lfile, env_->OptimizeForLogRead(env_options_));

View file

@ -174,6 +174,40 @@ TEST_F(RepairTest, UnflushedSst) {
ASSERT_EQ(Get("key"), "val");
}
TEST_F(RepairTest, SeparateWalDir) {
do {
Options options = CurrentOptions();
DestroyAndReopen(options);
Put("key", "val");
Put("foo", "bar");
VectorLogPtr wal_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
ASSERT_EQ(wal_files.size(), 1);
uint64_t total_ssts_size;
GetAllSSTFiles(&total_ssts_size);
ASSERT_EQ(total_ssts_size, 0);
std::string manifest_path =
DescriptorFileName(dbname_, dbfull()->TEST_Current_Manifest_FileNo());
Close();
ASSERT_OK(env_->FileExists(manifest_path));
ASSERT_OK(env_->DeleteFile(manifest_path));
ASSERT_OK(RepairDB(dbname_, options));
// make sure that all WALs are converted to SSTables.
options.wal_dir = "";
Reopen(options);
ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
ASSERT_EQ(wal_files.size(), 0);
GetAllSSTFiles(&total_ssts_size);
ASSERT_GT(total_ssts_size, 0);
ASSERT_EQ(Get("key"), "val");
ASSERT_EQ(Get("foo"), "bar");
} while(ChangeWalOptions());
}
TEST_F(RepairTest, RepairMultipleColumnFamilies) {
// Verify repair logic associates SST files with their original column
// families.