rocksdb/utilities/fault_injection_env.h
Zichen Zhu 65893ad959 Explicitly closing all directory file descriptors (#10049)
Summary:
Currently, the DB directory file descriptor is left open until the deconstruction process (`DB::Close()` does not close the file descriptor). To verify this, comment out the lines between `db_ = nullptr` and `db_->Close()` (line 512, 513, 514, 515 in ldb_cmd.cc) to leak the ``db_'' object, build `ldb` tool and run
```
strace --trace=open,openat,close ./ldb --db=$TEST_TMPDIR --ignore_unknown_options put K1 V1 --create_if_missing
```
There is one directory file descriptor that is not closed in the strace log.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10049

Test Plan: Add a new unit test DBBasicTest.DBCloseAllDirectoryFDs: Open a database with different WAL directory and three different data directories, and all directory file descriptors should be closed after calling Close(). Explicitly call Close() after a directory file descriptor is not used so that the counter of directory open and close should be equivalent.

Reviewed By: ajkr, hx235

Differential Revision: D36722135

Pulled By: littlepig2013

fbshipit-source-id: 07bdc2abc417c6b30997b9bbef1f79aa757b21ff
2022-06-01 18:03:34 -07:00

260 lines
8.3 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright 2014 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// This test uses a custom Env to keep track of the state of a filesystem as of
// the last "sync". It then checks for data loss errors by purposely dropping
// file data (or entire files) not protected by a "sync".
#pragma once
#include <map>
#include <set>
#include <string>
#include "file/filename.h"
#include "rocksdb/env.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {
class Random;
class TestWritableFile;
class FaultInjectionTestEnv;
struct FileState {
std::string filename_;
ssize_t pos_;
ssize_t pos_at_last_sync_;
ssize_t pos_at_last_flush_;
explicit FileState(const std::string& filename)
: filename_(filename),
pos_(-1),
pos_at_last_sync_(-1),
pos_at_last_flush_(-1) {}
FileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {}
bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; }
Status DropUnsyncedData(Env* env) const;
Status DropRandomUnsyncedData(Env* env, Random* rand) const;
};
class TestRandomAccessFile : public RandomAccessFile {
public:
TestRandomAccessFile(std::unique_ptr<RandomAccessFile>&& target,
FaultInjectionTestEnv* env);
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
Status Prefetch(uint64_t offset, size_t n) override;
Status MultiRead(ReadRequest* reqs, size_t num_reqs) override;
private:
std::unique_ptr<RandomAccessFile> target_;
FaultInjectionTestEnv* env_;
};
// A wrapper around WritableFileWriter* file
// is written to or sync'ed.
class TestWritableFile : public WritableFile {
public:
explicit TestWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>&& f,
FaultInjectionTestEnv* env);
virtual ~TestWritableFile();
virtual Status Append(const Slice& data) override;
virtual Status Append(
const Slice& data,
const DataVerificationInfo& /*verification_info*/) override {
return Append(data);
}
virtual Status Truncate(uint64_t size) override {
return target_->Truncate(size);
}
virtual Status Close() override;
virtual Status Flush() override;
virtual Status Sync() override;
virtual bool IsSyncThreadSafe() const override { return true; }
virtual Status PositionedAppend(const Slice& data,
uint64_t offset) override {
return target_->PositionedAppend(data, offset);
}
virtual Status PositionedAppend(
const Slice& data, uint64_t offset,
const DataVerificationInfo& /*verification_info*/) override {
return PositionedAppend(data, offset);
}
virtual bool use_direct_io() const override {
return target_->use_direct_io();
};
private:
FileState state_;
std::unique_ptr<WritableFile> target_;
bool writable_file_opened_;
FaultInjectionTestEnv* env_;
};
// A wrapper around WritableFileWriter* file
// is written to or sync'ed.
class TestRandomRWFile : public RandomRWFile {
public:
explicit TestRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>&& f,
FaultInjectionTestEnv* env);
virtual ~TestRandomRWFile();
Status Write(uint64_t offset, const Slice& data) override;
Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const override;
Status Close() override;
Status Flush() override;
Status Sync() override;
size_t GetRequiredBufferAlignment() const override {
return target_->GetRequiredBufferAlignment();
}
bool use_direct_io() const override { return target_->use_direct_io(); };
private:
std::unique_ptr<RandomRWFile> target_;
bool file_opened_;
FaultInjectionTestEnv* env_;
};
class TestDirectory : public Directory {
public:
explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname,
Directory* dir)
: env_(env), dirname_(dirname), dir_(dir) {}
~TestDirectory() {}
virtual Status Fsync() override;
virtual Status Close() override;
private:
FaultInjectionTestEnv* env_;
std::string dirname_;
std::unique_ptr<Directory> dir_;
};
class FaultInjectionTestEnv : public EnvWrapper {
public:
explicit FaultInjectionTestEnv(Env* base)
: EnvWrapper(base), filesystem_active_(true) {}
virtual ~FaultInjectionTestEnv() { error_.PermitUncheckedError(); }
static const char* kClassName() { return "FaultInjectionTestEnv"; }
const char* Name() const override { return kClassName(); }
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override;
Status ReopenWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override;
Status NewRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& soptions) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) override;
virtual Status DeleteFile(const std::string& f) override;
virtual Status RenameFile(const std::string& s,
const std::string& t) override;
virtual Status LinkFile(const std::string& s, const std::string& t) override;
// Undef to eliminate clash on Windows
#undef GetFreeSpace
virtual Status GetFreeSpace(const std::string& path,
uint64_t* disk_free) override {
if (!IsFilesystemActive() &&
error_.subcode() == IOStatus::SubCode::kNoSpace) {
*disk_free = 0;
return Status::OK();
} else {
return target()->GetFreeSpace(path, disk_free);
}
}
void WritableFileClosed(const FileState& state);
void WritableFileSynced(const FileState& state);
void WritableFileAppended(const FileState& state);
// For every file that is not fully synced, make a call to `func` with
// FileState of the file as the parameter.
Status DropFileData(std::function<Status(Env*, FileState)> func);
Status DropUnsyncedFileData();
Status DropRandomUnsyncedFileData(Random* rnd);
Status DeleteFilesCreatedAfterLastDirSync();
void ResetState();
void UntrackFile(const std::string& f);
void SyncDir(const std::string& dirname) {
MutexLock l(&mutex_);
dir_to_new_files_since_last_sync_.erase(dirname);
}
// Setting the filesystem to inactive is the test equivalent to simulating a
// system reset. Setting to inactive will freeze our saved filesystem state so
// that it will stop being recorded. It can then be reset back to the state at
// the time of the reset.
bool IsFilesystemActive() {
MutexLock l(&mutex_);
return filesystem_active_;
}
void SetFilesystemActiveNoLock(bool active,
Status error = Status::Corruption("Not active")) {
error.PermitUncheckedError();
filesystem_active_ = active;
if (!active) {
error_ = error;
}
error.PermitUncheckedError();
}
void SetFilesystemActive(bool active,
Status error = Status::Corruption("Not active")) {
error.PermitUncheckedError();
MutexLock l(&mutex_);
SetFilesystemActiveNoLock(active, error);
error.PermitUncheckedError();
}
void AssertNoOpenFile() { assert(open_managed_files_.empty()); }
Status GetError() { return error_; }
private:
port::Mutex mutex_;
std::map<std::string, FileState> db_file_state_;
std::set<std::string> open_managed_files_;
std::unordered_map<std::string, std::set<std::string>>
dir_to_new_files_since_last_sync_;
bool filesystem_active_; // Record flushes, syncs, writes
Status error_;
};
} // namespace ROCKSDB_NAMESPACE