rocksdb/env/fs_posix.cc
akankshamahajan ae0f9c3339 Add new property in IOOptions to skip recursing through directories and list only files during GetChildren. (#10668)
Summary:
Add new property "do_not_recurse" in  IOOptions for underlying file system to skip iteration of directories during DB::Open if there are no sub directories and list only files.
By default this property is set to false. This property is set true currently in the code where RocksDB is sure only files are needed during DB::Open.

Provided support in PosixFileSystem to use "do_not_recurse".

TestPlan:
- Existing tests

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10668

Reviewed By: anand1976

Differential Revision: D39471683

Pulled By: akankshamahajan15

fbshipit-source-id: 90e32f0b86d5346d53bc2714d3a0e7002590527f
2022-10-03 10:59:45 -07:00

1295 lines
42 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors
#if !defined(OS_WIN)
#include <dirent.h>
#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
#include <dlfcn.h>
#endif
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
#include <sys/statfs.h>
#include <sys/sysmacros.h>
#endif
#include <sys/statvfs.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <algorithm>
// Get nano time includes
#if defined(OS_LINUX) || defined(OS_FREEBSD)
#elif defined(__MACH__)
#include <Availability.h>
#include <mach/clock.h>
#include <mach/mach.h>
#else
#include <chrono>
#endif
#include <deque>
#include <set>
#include <vector>
#include "env/composite_env_wrapper.h"
#include "env/io_posix.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_updater.h"
#include "port/lang.h"
#include "port/port.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/object_registry.h"
#include "test_util/sync_point.h"
#include "util/coding.h"
#include "util/compression_context_cache.h"
#include "util/random.h"
#include "util/string_util.h"
#include "util/thread_local.h"
#include "util/threadpool_imp.h"
#if !defined(TMPFS_MAGIC)
#define TMPFS_MAGIC 0x01021994
#endif
#if !defined(XFS_SUPER_MAGIC)
#define XFS_SUPER_MAGIC 0x58465342
#endif
#if !defined(EXT4_SUPER_MAGIC)
#define EXT4_SUPER_MAGIC 0xEF53
#endif
extern "C" bool RocksDbIOUringEnable() __attribute__((__weak__));
namespace ROCKSDB_NAMESPACE {
namespace {
inline mode_t GetDBFileMode(bool allow_non_owner_access) {
return allow_non_owner_access ? 0644 : 0600;
}
// list of pathnames that are locked
// Only used for error message.
struct LockHoldingInfo {
int64_t acquire_time;
uint64_t acquiring_thread;
};
static std::map<std::string, LockHoldingInfo> locked_files;
static port::Mutex mutex_locked_files;
static int LockOrUnlock(int fd, bool lock) {
errno = 0;
struct flock f;
memset(&f, 0, sizeof(f));
f.l_type = (lock ? F_WRLCK : F_UNLCK);
f.l_whence = SEEK_SET;
f.l_start = 0;
f.l_len = 0; // Lock/unlock entire file
int value = fcntl(fd, F_SETLK, &f);
return value;
}
class PosixFileLock : public FileLock {
public:
int fd_ = /*invalid*/ -1;
std::string filename;
void Clear() {
fd_ = -1;
filename.clear();
}
virtual ~PosixFileLock() override {
// Check for destruction without UnlockFile
assert(fd_ == -1);
}
};
int cloexec_flags(int flags, const EnvOptions* options) {
// If the system supports opening the file with cloexec enabled,
// do so, as this avoids a race condition if a db is opened around
// the same time that a child process is forked
#ifdef O_CLOEXEC
if (options == nullptr || options->set_fd_cloexec) {
flags |= O_CLOEXEC;
}
#else
(void)options;
#endif
return flags;
}
class PosixFileSystem : public FileSystem {
public:
PosixFileSystem();
static const char* kClassName() { return "PosixFileSystem"; }
const char* Name() const override { return kClassName(); }
const char* NickName() const override { return kDefaultName(); }
~PosixFileSystem() override {}
bool IsInstanceOf(const std::string& name) const override {
if (name == "posix") {
return true;
} else {
return FileSystem::IsInstanceOf(name);
}
}
void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
}
}
IOStatus NewSequentialFile(const std::string& fname,
const FileOptions& options,
std::unique_ptr<FSSequentialFile>* result,
IODebugContext* /*dbg*/) override {
result->reset();
int fd = -1;
int flags = cloexec_flags(O_RDONLY, &options);
FILE* file = nullptr;
if (options.use_direct_reads && !options.use_mmap_reads) {
#ifdef ROCKSDB_LITE
return IOStatus::IOError(fname,
"Direct I/O not supported in RocksDB lite");
#endif // !ROCKSDB_LITE
#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
flags |= O_DIRECT;
TEST_SYNC_POINT_CALLBACK("NewSequentialFile:O_DIRECT", &flags);
#endif
}
do {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
return IOError("While opening a file for sequentially reading", fname,
errno);
}
SetFD_CLOEXEC(fd, &options);
if (options.use_direct_reads && !options.use_mmap_reads) {
#ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd);
return IOError("While fcntl NoCache", fname, errno);
}
#endif
} else {
do {
IOSTATS_TIMER_GUARD(open_nanos);
file = fdopen(fd, "r");
} while (file == nullptr && errno == EINTR);
if (file == nullptr) {
close(fd);
return IOError("While opening file for sequentially read", fname,
errno);
}
}
result->reset(new PosixSequentialFile(
fname, file, fd, GetLogicalBlockSizeForReadIfNeeded(options, fname, fd),
options));
return IOStatus::OK();
}
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& options,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* /*dbg*/) override {
result->reset();
IOStatus s = IOStatus::OK();
int fd;
int flags = cloexec_flags(O_RDONLY, &options);
if (options.use_direct_reads && !options.use_mmap_reads) {
#ifdef ROCKSDB_LITE
return IOStatus::IOError(fname,
"Direct I/O not supported in RocksDB lite");
#endif // !ROCKSDB_LITE
#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
flags |= O_DIRECT;
TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags);
#endif
}
do {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
s = IOError("While open a file for random read", fname, errno);
return s;
}
SetFD_CLOEXEC(fd, &options);
if (options.use_mmap_reads) {
// Use of mmap for random reads has been removed because it
// kills performance when storage is fast.
// Use mmap when virtual address-space is plentiful.
uint64_t size;
IOOptions opts;
s = GetFileSize(fname, opts, &size, nullptr);
if (s.ok()) {
void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0);
if (base != MAP_FAILED) {
result->reset(
new PosixMmapReadableFile(fd, fname, base, size, options));
} else {
s = IOError("while mmap file for read", fname, errno);
close(fd);
}
} else {
close(fd);
}
} else {
if (options.use_direct_reads && !options.use_mmap_reads) {
#ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd);
return IOError("while fcntl NoCache", fname, errno);
}
#endif
}
result->reset(new PosixRandomAccessFile(
fname, fd, GetLogicalBlockSizeForReadIfNeeded(options, fname, fd),
options
#if defined(ROCKSDB_IOURING_PRESENT)
,
!IsIOUringEnabled() ? nullptr : thread_local_io_urings_.get()
#endif
));
}
return s;
}
virtual IOStatus OpenWritableFile(const std::string& fname,
const FileOptions& options, bool reopen,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* /*dbg*/) {
result->reset();
IOStatus s;
int fd = -1;
int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC);
// Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
if (options.use_direct_writes && !options.use_mmap_writes) {
// Note: we should avoid O_APPEND here due to ta the following bug:
// POSIX requires that opening a file with the O_APPEND flag should
// have no affect on the location at which pwrite() writes data.
// However, on Linux, if a file is opened with O_APPEND, pwrite()
// appends data to the end of the file, regardless of the value of
// offset.
// More info here: https://linux.die.net/man/2/pwrite
#ifdef ROCKSDB_LITE
return IOStatus::IOError(fname,
"Direct I/O not supported in RocksDB lite");
#endif // ROCKSDB_LITE
flags |= O_WRONLY;
#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
flags |= O_DIRECT;
#endif
TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
} else if (options.use_mmap_writes) {
// non-direct I/O
flags |= O_RDWR;
} else {
flags |= O_WRONLY;
}
flags = cloexec_flags(flags, &options);
do {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
s = IOError("While open a file for appending", fname, errno);
return s;
}
SetFD_CLOEXEC(fd, &options);
if (options.use_mmap_writes) {
MaybeForceDisableMmap(fd);
}
if (options.use_mmap_writes && !forceMmapOff_) {
result->reset(new PosixMmapFile(fname, fd, page_size_, options));
} else if (options.use_direct_writes && !options.use_mmap_writes) {
#ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd);
s = IOError("While fcntl NoCache an opened file for appending", fname,
errno);
return s;
}
#elif defined(OS_SOLARIS)
if (directio(fd, DIRECTIO_ON) == -1) {
if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
close(fd);
s = IOError("While calling directio()", fname, errno);
return s;
}
}
#endif
result->reset(new PosixWritableFile(
fname, fd, GetLogicalBlockSizeForWriteIfNeeded(options, fname, fd),
options));
} else {
// disable mmap writes
EnvOptions no_mmap_writes_options = options;
no_mmap_writes_options.use_mmap_writes = false;
result->reset(
new PosixWritableFile(fname, fd,
GetLogicalBlockSizeForWriteIfNeeded(
no_mmap_writes_options, fname, fd),
no_mmap_writes_options));
}
return s;
}
IOStatus NewWritableFile(const std::string& fname, const FileOptions& options,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
return OpenWritableFile(fname, options, false, result, dbg);
}
IOStatus ReopenWritableFile(const std::string& fname,
const FileOptions& options,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
return OpenWritableFile(fname, options, true, result, dbg);
}
IOStatus ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
const FileOptions& options,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* /*dbg*/) override {
result->reset();
IOStatus s;
int fd = -1;
int flags = 0;
// Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
if (options.use_direct_writes && !options.use_mmap_writes) {
#ifdef ROCKSDB_LITE
return IOStatus::IOError(fname,
"Direct I/O not supported in RocksDB lite");
#endif // !ROCKSDB_LITE
flags |= O_WRONLY;
#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
flags |= O_DIRECT;
#endif
TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
} else if (options.use_mmap_writes) {
// mmap needs O_RDWR mode
flags |= O_RDWR;
} else {
flags |= O_WRONLY;
}
flags = cloexec_flags(flags, &options);
do {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(old_fname.c_str(), flags,
GetDBFileMode(allow_non_owner_access_));
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
s = IOError("while reopen file for write", fname, errno);
return s;
}
SetFD_CLOEXEC(fd, &options);
// rename into place
if (rename(old_fname.c_str(), fname.c_str()) != 0) {
s = IOError("while rename file to " + fname, old_fname, errno);
close(fd);
return s;
}
if (options.use_mmap_writes) {
MaybeForceDisableMmap(fd);
}
if (options.use_mmap_writes && !forceMmapOff_) {
result->reset(new PosixMmapFile(fname, fd, page_size_, options));
} else if (options.use_direct_writes && !options.use_mmap_writes) {
#ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd);
s = IOError("while fcntl NoCache for reopened file for append", fname,
errno);
return s;
}
#elif defined(OS_SOLARIS)
if (directio(fd, DIRECTIO_ON) == -1) {
if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
close(fd);
s = IOError("while calling directio()", fname, errno);
return s;
}
}
#endif
result->reset(new PosixWritableFile(
fname, fd, GetLogicalBlockSizeForWriteIfNeeded(options, fname, fd),
options));
} else {
// disable mmap writes
FileOptions no_mmap_writes_options = options;
no_mmap_writes_options.use_mmap_writes = false;
result->reset(
new PosixWritableFile(fname, fd,
GetLogicalBlockSizeForWriteIfNeeded(
no_mmap_writes_options, fname, fd),
no_mmap_writes_options));
}
return s;
}
IOStatus NewRandomRWFile(const std::string& fname, const FileOptions& options,
std::unique_ptr<FSRandomRWFile>* result,
IODebugContext* /*dbg*/) override {
int fd = -1;
int flags = cloexec_flags(O_RDWR, &options);
while (fd < 0) {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
if (fd < 0) {
// Error while opening the file
if (errno == EINTR) {
continue;
}
return IOError("While open file for random read/write", fname, errno);
}
}
SetFD_CLOEXEC(fd, &options);
result->reset(new PosixRandomRWFile(fname, fd, options));
return IOStatus::OK();
}
IOStatus NewMemoryMappedFileBuffer(
const std::string& fname,
std::unique_ptr<MemoryMappedFileBuffer>* result) override {
int fd = -1;
IOStatus status;
int flags = cloexec_flags(O_RDWR, nullptr);
while (fd < 0) {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), flags, 0644);
if (fd < 0) {
// Error while opening the file
if (errno == EINTR) {
continue;
}
status =
IOError("While open file for raw mmap buffer access", fname, errno);
break;
}
}
uint64_t size;
if (status.ok()) {
IOOptions opts;
status = GetFileSize(fname, opts, &size, nullptr);
}
void* base = nullptr;
if (status.ok()) {
base = mmap(nullptr, static_cast<size_t>(size), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
status = IOError("while mmap file for read", fname, errno);
}
}
if (status.ok()) {
result->reset(
new PosixMemoryMappedFileBuffer(base, static_cast<size_t>(size)));
}
if (fd >= 0) {
// don't need to keep it open after mmap has been called
close(fd);
}
return status;
}
IOStatus NewDirectory(const std::string& name, const IOOptions& /*opts*/,
std::unique_ptr<FSDirectory>* result,
IODebugContext* /*dbg*/) override {
result->reset();
int fd;
int flags = cloexec_flags(0, nullptr);
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(name.c_str(), flags);
}
if (fd < 0) {
return IOError("While open directory", name, errno);
} else {
result->reset(new PosixDirectory(fd, name));
}
return IOStatus::OK();
}
IOStatus FileExists(const std::string& fname, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
int result = access(fname.c_str(), F_OK);
if (result == 0) {
return IOStatus::OK();
}
int err = errno;
switch (err) {
case EACCES:
case ELOOP:
case ENAMETOOLONG:
case ENOENT:
case ENOTDIR:
return IOStatus::NotFound();
default:
assert(err == EIO || err == ENOMEM);
return IOStatus::IOError("Unexpected error(" + std::to_string(err) +
") accessing file `" + fname + "' ");
}
}
IOStatus GetChildren(const std::string& dir, const IOOptions& opts,
std::vector<std::string>* result,
IODebugContext* /*dbg*/) override {
result->clear();
DIR* d = opendir(dir.c_str());
if (d == nullptr) {
switch (errno) {
case EACCES:
case ENOENT:
case ENOTDIR:
return IOStatus::NotFound();
default:
return IOError("While opendir", dir, errno);
}
}
// reset errno before calling readdir()
errno = 0;
struct dirent* entry;
while ((entry = readdir(d)) != nullptr) {
// filter out '.' and '..' directory entries
// which appear only on some platforms
const bool ignore =
entry->d_type == DT_DIR &&
(strcmp(entry->d_name, ".") == 0 ||
strcmp(entry->d_name, "..") == 0
#ifndef ASSERT_STATUS_CHECKED
// In case of ASSERT_STATUS_CHECKED, GetChildren support older
// version of API for debugging purpose.
|| opts.do_not_recurse
#endif
);
if (!ignore) {
result->push_back(entry->d_name);
}
errno = 0; // reset errno if readdir() success
}
// always attempt to close the dir
const auto pre_close_errno = errno; // errno may be modified by closedir
const int close_result = closedir(d);
if (pre_close_errno != 0) {
// error occurred during readdir
return IOError("While readdir", dir, pre_close_errno);
}
if (close_result != 0) {
// error occurred during closedir
return IOError("While closedir", dir, errno);
}
return IOStatus::OK();
}
IOStatus DeleteFile(const std::string& fname, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
IOStatus result;
if (unlink(fname.c_str()) != 0) {
result = IOError("while unlink() file", fname, errno);
}
return result;
}
IOStatus CreateDir(const std::string& name, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
if (mkdir(name.c_str(), 0755) != 0) {
return IOError("While mkdir", name, errno);
}
return IOStatus::OK();
}
IOStatus CreateDirIfMissing(const std::string& name,
const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
if (mkdir(name.c_str(), 0755) != 0) {
if (errno != EEXIST) {
return IOError("While mkdir if missing", name, errno);
} else if (!DirExists(name)) { // Check that name is actually a
// directory.
// Message is taken from mkdir
return IOStatus::IOError("`" + name +
"' exists but is not a directory");
}
}
return IOStatus::OK();
}
IOStatus DeleteDir(const std::string& name, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
if (rmdir(name.c_str()) != 0) {
return IOError("file rmdir", name, errno);
}
return IOStatus::OK();
}
IOStatus GetFileSize(const std::string& fname, const IOOptions& /*opts*/,
uint64_t* size, IODebugContext* /*dbg*/) override {
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
*size = 0;
return IOError("while stat a file for size", fname, errno);
} else {
*size = sbuf.st_size;
}
return IOStatus::OK();
}
IOStatus GetFileModificationTime(const std::string& fname,
const IOOptions& /*opts*/,
uint64_t* file_mtime,
IODebugContext* /*dbg*/) override {
struct stat s;
if (stat(fname.c_str(), &s) != 0) {
return IOError("while stat a file for modification time", fname, errno);
}
*file_mtime = static_cast<uint64_t>(s.st_mtime);
return IOStatus::OK();
}
IOStatus RenameFile(const std::string& src, const std::string& target,
const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
if (rename(src.c_str(), target.c_str()) != 0) {
return IOError("While renaming a file to " + target, src, errno);
}
return IOStatus::OK();
}
IOStatus LinkFile(const std::string& src, const std::string& target,
const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
if (link(src.c_str(), target.c_str()) != 0) {
if (errno == EXDEV || errno == ENOTSUP) {
return IOStatus::NotSupported(errno == EXDEV
? "No cross FS links allowed"
: "Links not supported by FS");
}
return IOError("while link file to " + target, src, errno);
}
return IOStatus::OK();
}
IOStatus NumFileLinks(const std::string& fname, const IOOptions& /*opts*/,
uint64_t* count, IODebugContext* /*dbg*/) override {
struct stat s;
if (stat(fname.c_str(), &s) != 0) {
return IOError("while stat a file for num file links", fname, errno);
}
*count = static_cast<uint64_t>(s.st_nlink);
return IOStatus::OK();
}
IOStatus AreFilesSame(const std::string& first, const std::string& second,
const IOOptions& /*opts*/, bool* res,
IODebugContext* /*dbg*/) override {
struct stat statbuf[2];
if (stat(first.c_str(), &statbuf[0]) != 0) {
return IOError("stat file", first, errno);
}
if (stat(second.c_str(), &statbuf[1]) != 0) {
return IOError("stat file", second, errno);
}
if (major(statbuf[0].st_dev) != major(statbuf[1].st_dev) ||
minor(statbuf[0].st_dev) != minor(statbuf[1].st_dev) ||
statbuf[0].st_ino != statbuf[1].st_ino) {
*res = false;
} else {
*res = true;
}
return IOStatus::OK();
}
IOStatus LockFile(const std::string& fname, const IOOptions& /*opts*/,
FileLock** lock, IODebugContext* /*dbg*/) override {
*lock = nullptr;
LockHoldingInfo lhi;
int64_t current_time = 0;
// Ignore status code as the time is only used for error message.
SystemClock::Default()
->GetCurrentTime(&current_time)
.PermitUncheckedError();
lhi.acquire_time = current_time;
lhi.acquiring_thread = Env::Default()->GetThreadID();
mutex_locked_files.Lock();
// If it already exists in the locked_files set, then it is already locked,
// and fail this lock attempt. Otherwise, insert it into locked_files.
// This check is needed because fcntl() does not detect lock conflict
// if the fcntl is issued by the same thread that earlier acquired
// this lock.
// We must do this check *before* opening the file:
// Otherwise, we will open a new file descriptor. Locks are associated with
// a process, not a file descriptor and when *any* file descriptor is
// closed, all locks the process holds for that *file* are released
const auto it_success = locked_files.insert({fname, lhi});
if (it_success.second == false) {
LockHoldingInfo prev_info = it_success.first->second;
mutex_locked_files.Unlock();
errno = ENOLCK;
// Note that the thread ID printed is the same one as the one in
// posix logger, but posix logger prints it hex format.
return IOError("lock hold by current process, acquire time " +
std::to_string(prev_info.acquire_time) +
" acquiring thread " +
std::to_string(prev_info.acquiring_thread),
fname, errno);
}
IOStatus result = IOStatus::OK();
int fd;
int flags = cloexec_flags(O_RDWR | O_CREAT, nullptr);
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), flags, 0644);
}
if (fd < 0) {
result = IOError("while open a file for lock", fname, errno);
} else if (LockOrUnlock(fd, true) == -1) {
result = IOError("While lock file", fname, errno);
close(fd);
} else {
SetFD_CLOEXEC(fd, nullptr);
PosixFileLock* my_lock = new PosixFileLock;
my_lock->fd_ = fd;
my_lock->filename = fname;
*lock = my_lock;
}
if (!result.ok()) {
// If there is an error in locking, then remove the pathname from
// locked_files. (If we got this far, it did not exist in locked_files
// before this call.)
locked_files.erase(fname);
}
mutex_locked_files.Unlock();
return result;
}
IOStatus UnlockFile(FileLock* lock, const IOOptions& /*opts*/,
IODebugContext* /*dbg*/) override {
PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
IOStatus result;
mutex_locked_files.Lock();
// If we are unlocking, then verify that we had locked it earlier,
// it should already exist in locked_files. Remove it from locked_files.
if (locked_files.erase(my_lock->filename) != 1) {
errno = ENOLCK;
result = IOError("unlock", my_lock->filename, errno);
} else if (LockOrUnlock(my_lock->fd_, false) == -1) {
result = IOError("unlock", my_lock->filename, errno);
}
close(my_lock->fd_);
my_lock->Clear();
delete my_lock;
mutex_locked_files.Unlock();
return result;
}
IOStatus GetAbsolutePath(const std::string& db_path,
const IOOptions& /*opts*/, std::string* output_path,
IODebugContext* /*dbg*/) override {
if (!db_path.empty() && db_path[0] == '/') {
*output_path = db_path;
return IOStatus::OK();
}
char the_path[4096];
char* ret = getcwd(the_path, 4096);
if (ret == nullptr) {
return IOStatus::IOError(errnoStr(errno).c_str());
}
*output_path = ret;
return IOStatus::OK();
}
IOStatus GetTestDirectory(const IOOptions& /*opts*/, std::string* result,
IODebugContext* /*dbg*/) override {
const char* env = getenv("TEST_TMPDIR");
if (env && env[0] != '\0') {
*result = env;
} else {
char buf[100];
snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
*result = buf;
}
// Directory may already exist
{
IOOptions opts;
return CreateDirIfMissing(*result, opts, nullptr);
}
return IOStatus::OK();
}
IOStatus GetFreeSpace(const std::string& fname, const IOOptions& /*opts*/,
uint64_t* free_space,
IODebugContext* /*dbg*/) override {
struct statvfs sbuf;
if (statvfs(fname.c_str(), &sbuf) < 0) {
return IOError("While doing statvfs", fname, errno);
}
// sbuf.bfree is total free space available to root
// sbuf.bavail is total free space available to unprivileged user
// sbuf.bavail <= sbuf.bfree ... pick correct based upon effective user id
if (geteuid()) {
// non-zero user is unprivileged, or -1 if error. take more conservative
// size
*free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bavail);
} else {
// root user can access all disk space
*free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
}
return IOStatus::OK();
}
IOStatus IsDirectory(const std::string& path, const IOOptions& /*opts*/,
bool* is_dir, IODebugContext* /*dbg*/) override {
// First open
int fd = -1;
int flags = cloexec_flags(O_RDONLY, nullptr);
{
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(path.c_str(), flags);
}
if (fd < 0) {
return IOError("While open for IsDirectory()", path, errno);
}
IOStatus io_s;
struct stat sbuf;
if (fstat(fd, &sbuf) < 0) {
io_s = IOError("While doing stat for IsDirectory()", path, errno);
}
close(fd);
if (io_s.ok() && nullptr != is_dir) {
*is_dir = S_ISDIR(sbuf.st_mode);
}
return io_s;
}
FileOptions OptimizeForLogWrite(const FileOptions& file_options,
const DBOptions& db_options) const override {
FileOptions optimized = file_options;
optimized.use_mmap_writes = false;
optimized.use_direct_writes = false;
optimized.bytes_per_sync = db_options.wal_bytes_per_sync;
// TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it
// breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
// test and make this false
optimized.fallocate_with_keep_size = true;
optimized.writable_file_max_buffer_size =
db_options.writable_file_max_buffer_size;
return optimized;
}
FileOptions OptimizeForManifestWrite(
const FileOptions& file_options) const override {
FileOptions optimized = file_options;
optimized.use_mmap_writes = false;
optimized.use_direct_writes = false;
optimized.fallocate_with_keep_size = true;
return optimized;
}
#ifdef OS_LINUX
Status RegisterDbPaths(const std::vector<std::string>& paths) override {
return logical_block_size_cache_.RefAndCacheLogicalBlockSize(paths);
}
Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
logical_block_size_cache_.UnrefAndTryRemoveCachedLogicalBlockSize(paths);
return Status::OK();
}
#endif
private:
bool forceMmapOff_ = false; // do we override Env options?
// Returns true iff the named directory exists and is a directory.
virtual bool DirExists(const std::string& dname) {
struct stat statbuf;
if (stat(dname.c_str(), &statbuf) == 0) {
return S_ISDIR(statbuf.st_mode);
}
return false; // stat() failed return false
}
bool SupportsFastAllocate(int fd) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
struct statfs s;
if (fstatfs(fd, &s)) {
return false;
}
switch (s.f_type) {
case EXT4_SUPER_MAGIC:
return true;
case XFS_SUPER_MAGIC:
return true;
case TMPFS_MAGIC:
return true;
default:
return false;
}
#else
(void)fd;
return false;
#endif
}
void MaybeForceDisableMmap(int fd) {
static std::once_flag s_check_disk_for_mmap_once;
assert(this == FileSystem::Default().get());
std::call_once(
s_check_disk_for_mmap_once,
[this](int fdesc) {
// this will be executed once in the program's lifetime.
// do not use mmapWrite on non ext-3/xfs/tmpfs systems.
if (!SupportsFastAllocate(fdesc)) {
forceMmapOff_ = true;
}
},
fd);
}
#ifdef ROCKSDB_IOURING_PRESENT
bool IsIOUringEnabled() {
if (RocksDbIOUringEnable && RocksDbIOUringEnable()) {
return true;
} else {
return false;
}
}
#endif // ROCKSDB_IOURING_PRESENT
// EXPERIMENTAL
//
// TODO akankshamahajan:
// 1. Update Poll API to take into account min_completions
// and returns if number of handles in io_handles (any order) completed is
// equal to atleast min_completions.
// 2. Currently in case of direct_io, Read API is called because of which call
// to Poll API fails as it expects IOHandle to be populated.
virtual IOStatus Poll(std::vector<void*>& io_handles,
size_t /*min_completions*/) override {
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring_queue_init.
struct io_uring* iu = nullptr;
if (thread_local_io_urings_) {
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
}
// Init failed, platform doesn't support io_uring.
if (iu == nullptr) {
return IOStatus::NotSupported("Poll");
}
for (size_t i = 0; i < io_handles.size(); i++) {
// The request has been completed in earlier runs.
if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
continue;
}
// Loop until IO for io_handles[i] is completed.
while (true) {
// io_uring_wait_cqe.
struct io_uring_cqe* cqe = nullptr;
ssize_t ret = io_uring_wait_cqe(iu, &cqe);
if (ret) {
// abort as it shouldn't be in indeterminate state and there is no
// good way currently to handle this error.
abort();
}
// Step 3: Populate the request.
assert(cqe != nullptr);
Posix_IOHandle* posix_handle =
static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
assert(posix_handle->iu == iu);
if (posix_handle->iu != iu) {
return IOStatus::IOError("");
}
// Reset cqe data to catch any stray reuse of it
static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
FSReadRequest req;
req.scratch = posix_handle->scratch;
req.offset = posix_handle->offset;
req.len = posix_handle->len;
size_t finished_len = 0;
size_t bytes_read = 0;
bool read_again = false;
UpdateResult(cqe, "", req.len, posix_handle->iov.iov_len,
true /*async_read*/, posix_handle->use_direct_io,
posix_handle->alignment, finished_len, &req, bytes_read,
read_again);
posix_handle->is_finished = true;
io_uring_cqe_seen(iu, cqe);
posix_handle->cb(req, posix_handle->cb_arg);
(void)finished_len;
(void)bytes_read;
(void)read_again;
if (static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
break;
}
}
}
return IOStatus::OK();
#else
(void)io_handles;
return IOStatus::NotSupported("Poll");
#endif
}
virtual IOStatus AbortIO(std::vector<void*>& io_handles) override {
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring_queue_init.
struct io_uring* iu = nullptr;
if (thread_local_io_urings_) {
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
}
// Init failed, platform doesn't support io_uring.
// If Poll is not supported then it didn't submit any request and it should
// return OK.
if (iu == nullptr) {
return IOStatus::OK();
}
for (size_t i = 0; i < io_handles.size(); i++) {
Posix_IOHandle* posix_handle =
static_cast<Posix_IOHandle*>(io_handles[i]);
if (posix_handle->is_finished == true) {
continue;
}
assert(posix_handle->iu == iu);
if (posix_handle->iu != iu) {
return IOStatus::IOError("");
}
// Prepare the cancel request.
struct io_uring_sqe* sqe;
sqe = io_uring_get_sqe(iu);
// In order to cancel the request, sqe->addr of cancel request should
// match with the read request submitted which is posix_handle->iov.
io_uring_prep_cancel(sqe, &posix_handle->iov, 0);
// Sets sqe->user_data to posix_handle.
io_uring_sqe_set_data(sqe, posix_handle);
// submit the request.
ssize_t ret = io_uring_submit(iu);
if (ret < 0) {
fprintf(stderr, "io_uring_submit error: %ld\n", long(ret));
return IOStatus::IOError("io_uring_submit() requested but returned " +
std::to_string(ret));
}
}
// After submitting the requests, wait for the requests.
for (size_t i = 0; i < io_handles.size(); i++) {
if ((static_cast<Posix_IOHandle*>(io_handles[i]))->is_finished) {
continue;
}
while (true) {
struct io_uring_cqe* cqe = nullptr;
ssize_t ret = io_uring_wait_cqe(iu, &cqe);
if (ret) {
// abort as it shouldn't be in indeterminate state and there is no
// good way currently to handle this error.
abort();
}
assert(cqe != nullptr);
// Returns cqe->user_data.
Posix_IOHandle* posix_handle =
static_cast<Posix_IOHandle*>(io_uring_cqe_get_data(cqe));
assert(posix_handle->iu == iu);
if (posix_handle->iu != iu) {
return IOStatus::IOError("");
}
posix_handle->req_count++;
// Reset cqe data to catch any stray reuse of it
static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
io_uring_cqe_seen(iu, cqe);
// - If the request is cancelled successfully, the original request is
// completed with -ECANCELED and the cancel request is completed with
// a result of 0.
// - If the request was already running, the original may or
// may not complete in error. The cancel request will complete with
// -EALREADY for that case.
// - And finally, if the request to cancel wasn't
// found, the cancel request is completed with -ENOENT.
//
// Every handle has to wait for 2 requests completion: original one and
// the cancel request which is tracked by PosixHandle::req_count.
if (posix_handle->req_count == 2 &&
static_cast<Posix_IOHandle*>(io_handles[i]) == posix_handle) {
posix_handle->is_finished = true;
FSReadRequest req;
req.status = IOStatus::Aborted();
posix_handle->cb(req, posix_handle->cb_arg);
break;
}
}
}
return IOStatus::OK();
#else
// If Poll is not supported then it didn't submit any request and it should
// return OK.
(void)io_handles;
return IOStatus::OK();
#endif
}
#if defined(ROCKSDB_IOURING_PRESENT)
// io_uring instance
std::unique_ptr<ThreadLocalPtr> thread_local_io_urings_;
#endif
size_t page_size_;
// If true, allow non owner read access for db files. Otherwise, non-owner
// has no access to db files.
bool allow_non_owner_access_;
#ifdef OS_LINUX
static LogicalBlockSizeCache logical_block_size_cache_;
#endif
static size_t GetLogicalBlockSize(const std::string& fname, int fd);
// In non-direct IO mode, this directly returns kDefaultPageSize.
// Otherwise call GetLogicalBlockSize.
static size_t GetLogicalBlockSizeForReadIfNeeded(const EnvOptions& options,
const std::string& fname,
int fd);
static size_t GetLogicalBlockSizeForWriteIfNeeded(const EnvOptions& options,
const std::string& fname,
int fd);
};
#ifdef OS_LINUX
LogicalBlockSizeCache PosixFileSystem::logical_block_size_cache_;
#endif
size_t PosixFileSystem::GetLogicalBlockSize(const std::string& fname, int fd) {
#ifdef OS_LINUX
return logical_block_size_cache_.GetLogicalBlockSize(fname, fd);
#else
(void)fname;
return PosixHelper::GetLogicalBlockSizeOfFd(fd);
#endif
}
size_t PosixFileSystem::GetLogicalBlockSizeForReadIfNeeded(
const EnvOptions& options, const std::string& fname, int fd) {
return options.use_direct_reads
? PosixFileSystem::GetLogicalBlockSize(fname, fd)
: kDefaultPageSize;
}
size_t PosixFileSystem::GetLogicalBlockSizeForWriteIfNeeded(
const EnvOptions& options, const std::string& fname, int fd) {
return options.use_direct_writes
? PosixFileSystem::GetLogicalBlockSize(fname, fd)
: kDefaultPageSize;
}
PosixFileSystem::PosixFileSystem()
: forceMmapOff_(false),
page_size_(getpagesize()),
allow_non_owner_access_(true) {
#if defined(ROCKSDB_IOURING_PRESENT)
// Test whether IOUring is supported, and if it does, create a managing
// object for thread local point so that in the future thread-local
// io_uring can be created.
struct io_uring* new_io_uring = CreateIOUring();
if (new_io_uring != nullptr) {
thread_local_io_urings_.reset(new ThreadLocalPtr(DeleteIOUring));
delete new_io_uring;
}
#endif
}
} // namespace
//
// Default Posix FileSystem
//
std::shared_ptr<FileSystem> FileSystem::Default() {
STATIC_AVOID_DESTRUCTION(std::shared_ptr<FileSystem>, instance)
(std::make_shared<PosixFileSystem>());
return instance;
}
#ifndef ROCKSDB_LITE
static FactoryFunc<FileSystem> posix_filesystem_reg =
ObjectLibrary::Default()->AddFactory<FileSystem>(
ObjectLibrary::PatternEntry("posix").AddSeparator("://", false),
[](const std::string& /* uri */, std::unique_ptr<FileSystem>* f,
std::string* /* errmsg */) {
f->reset(new PosixFileSystem());
return f->get();
});
#endif
} // namespace ROCKSDB_NAMESPACE
#endif