rocksdb/env/env.cc

494 lines
14 KiB
C++
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/env.h"
#include <thread>
#include "logging/env_logger.h"
#include "memory/arena.h"
#include "options/db_options.h"
#include "port/port.h"
#include "port/sys_time.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/object_registry.h"
#include "util/autovector.h"
namespace rocksdb {
Env::~Env() {
}
Status Env::NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) {
return NewEnvLogger(fname, this, result);
}
Status Env::LoadEnv(const std::string& value, Env** result) {
Env* env = *result;
Status s;
#ifndef ROCKSDB_LITE
s = ObjectRegistry::NewInstance()->NewStaticObject<Env>(value, &env);
#else
s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
#endif
if (s.ok()) {
*result = env;
}
return s;
}
Status Env::LoadEnv(const std::string& value, Env** result,
std::shared_ptr<Env>* guard) {
assert(result);
Status s;
#ifndef ROCKSDB_LITE
Env* env = nullptr;
std::unique_ptr<Env> uniq_guard;
std::string err_msg;
assert(guard != nullptr);
env = ObjectRegistry::NewInstance()->NewObject<Env>(value, &uniq_guard,
&err_msg);
if (!env) {
s = Status::NotFound(std::string("Cannot load ") + Env::Type() + ": " +
value);
env = Env::Default();
}
if (s.ok() && uniq_guard) {
guard->reset(uniq_guard.release());
*result = guard->get();
} else {
*result = env;
}
#else
(void)result;
(void)guard;
s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
#endif
return s;
}
std::string Env::PriorityToString(Env::Priority priority) {
switch (priority) {
case Env::Priority::BOTTOM:
return "Bottom";
case Env::Priority::LOW:
return "Low";
case Env::Priority::HIGH:
return "High";
case Env::Priority::USER:
return "User";
case Env::Priority::TOTAL:
assert(false);
}
return "Invalid";
}
uint64_t Env::GetThreadID() const {
std::hash<std::thread::id> hasher;
return hasher(std::this_thread::get_id());
}
Status Env::ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) {
Status s = RenameFile(old_fname, fname);
if (!s.ok()) {
return s;
}
return NewWritableFile(fname, result, options);
}
Status Env::GetChildrenFileAttributes(const std::string& dir,
std::vector<FileAttributes>* result) {
assert(result != nullptr);
std::vector<std::string> child_fnames;
Status s = GetChildren(dir, &child_fnames);
if (!s.ok()) {
return s;
}
result->resize(child_fnames.size());
size_t result_size = 0;
for (size_t i = 0; i < child_fnames.size(); ++i) {
const std::string path = dir + "/" + child_fnames[i];
if (!(s = GetFileSize(path, &(*result)[result_size].size_bytes)).ok()) {
if (FileExists(path).IsNotFound()) {
// The file may have been deleted since we listed the directory
continue;
}
return s;
}
(*result)[result_size].name = std::move(child_fnames[i]);
result_size++;
}
result->resize(result_size);
return Status::OK();
}
SequentialFile::~SequentialFile() {
}
RandomAccessFile::~RandomAccessFile() {
}
WritableFile::~WritableFile() {
}
MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {}
Logger::~Logger() {}
Status Logger::Close() {
if (!closed_) {
closed_ = true;
return CloseImpl();
} else {
return Status::OK();
}
}
Status Logger::CloseImpl() { return Status::NotSupported(); }
FileLock::~FileLock() {
}
void LogFlush(Logger *info_log) {
if (info_log) {
info_log->Flush();
}
}
static void Logv(Logger *info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
}
}
void Log(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Logv(info_log, format, ap);
va_end(ap);
}
void Logger::Logv(const InfoLogLevel log_level, const char* format, va_list ap) {
static const char* kInfoLogLevelNames[5] = { "DEBUG", "INFO", "WARN",
"ERROR", "FATAL" };
if (log_level < log_level_) {
return;
}
if (log_level == InfoLogLevel::INFO_LEVEL) {
// Doesn't print log level if it is INFO level.
// This is to avoid unexpected performance regression after we add
// the feature of log level. All the logs before we add the feature
// are INFO level. We don't want to add extra costs to those existing
// logging.
Logv(format, ap);
} else if (log_level == InfoLogLevel::HEADER_LEVEL) {
LogHeader(format, ap);
} else {
char new_format[500];
snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
kInfoLogLevelNames[log_level], format);
Logv(new_format, ap);
}
}
static void Logv(const InfoLogLevel log_level, Logger *info_log, const char *format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= log_level) {
if (log_level == InfoLogLevel::HEADER_LEVEL) {
info_log->LogHeader(format, ap);
} else {
info_log->Logv(log_level, format, ap);
}
}
}
void Log(const InfoLogLevel log_level, Logger* info_log, const char* format,
...) {
va_list ap;
va_start(ap, format);
Logv(log_level, info_log, format, ap);
va_end(ap);
}
static void Headerv(Logger *info_log, const char *format, va_list ap) {
if (info_log) {
info_log->LogHeader(format, ap);
}
}
void Header(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Headerv(info_log, format, ap);
va_end(ap);
}
static void Debugv(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
info_log->Logv(InfoLogLevel::DEBUG_LEVEL, format, ap);
}
}
void Debug(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Debugv(info_log, format, ap);
va_end(ap);
}
static void Infov(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
}
}
void Info(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Infov(info_log, format, ap);
va_end(ap);
}
static void Warnv(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::WARN_LEVEL) {
info_log->Logv(InfoLogLevel::WARN_LEVEL, format, ap);
}
}
void Warn(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Warnv(info_log, format, ap);
va_end(ap);
}
static void Errorv(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::ERROR_LEVEL) {
info_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap);
}
}
void Error(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Errorv(info_log, format, ap);
va_end(ap);
}
static void Fatalv(Logger* info_log, const char* format, va_list ap) {
if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::FATAL_LEVEL) {
info_log->Logv(InfoLogLevel::FATAL_LEVEL, format, ap);
}
}
void Fatal(Logger* info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Fatalv(info_log, format, ap);
va_end(ap);
}
void LogFlush(const std::shared_ptr<Logger>& info_log) {
LogFlush(info_log.get());
}
void Log(const InfoLogLevel log_level, const std::shared_ptr<Logger>& info_log,
const char* format, ...) {
va_list ap;
va_start(ap, format);
Logv(log_level, info_log.get(), format, ap);
va_end(ap);
}
void Header(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Headerv(info_log.get(), format, ap);
va_end(ap);
}
void Debug(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Debugv(info_log.get(), format, ap);
va_end(ap);
}
void Info(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Infov(info_log.get(), format, ap);
va_end(ap);
}
void Warn(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Warnv(info_log.get(), format, ap);
va_end(ap);
}
void Error(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Errorv(info_log.get(), format, ap);
va_end(ap);
}
void Fatal(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Fatalv(info_log.get(), format, ap);
va_end(ap);
}
void Log(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
va_list ap;
va_start(ap, format);
Logv(info_log.get(), format, ap);
va_end(ap);
}
Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname,
bool should_sync) {
std::unique_ptr<WritableFile> file;
EnvOptions soptions;
Status s = env->NewWritableFile(fname, &file, soptions);
if (!s.ok()) {
return s;
}
s = file->Append(data);
if (s.ok() && should_sync) {
s = file->Sync();
}
if (!s.ok()) {
env->DeleteFile(fname);
}
return s;
}
Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
EnvOptions soptions;
data->clear();
std::unique_ptr<SequentialFile> file;
Status s = env->NewSequentialFile(fname, &file, soptions);
if (!s.ok()) {
return s;
}
static const int kBufferSize = 8192;
char* space = new char[kBufferSize];
while (true) {
Slice fragment;
s = file->Read(kBufferSize, &fragment, space);
if (!s.ok()) {
break;
}
data->append(fragment.data(), fragment.size());
if (fragment.empty()) {
break;
}
}
delete[] space;
return s;
}
EnvWrapper::~EnvWrapper() {
}
namespace { // anonymous namespace
void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
env_options->use_mmap_reads = options.allow_mmap_reads;
env_options->use_mmap_writes = options.allow_mmap_writes;
env_options->use_direct_reads = options.use_direct_reads;
env_options->set_fd_cloexec = options.is_fd_close_on_exec;
env_options->bytes_per_sync = options.bytes_per_sync;
env_options->compaction_readahead_size = options.compaction_readahead_size;
env_options->random_access_max_buffer_size =
options.random_access_max_buffer_size;
env_options->rate_limiter = options.rate_limiter.get();
env_options->writable_file_max_buffer_size =
options.writable_file_max_buffer_size;
env_options->allow_fallocate = options.allow_fallocate;
Optionally wait on bytes_per_sync to smooth I/O (#5183) Summary: The existing implementation does not guarantee bytes reach disk every `bytes_per_sync` when writing SST files, or every `wal_bytes_per_sync` when writing WALs. This can cause confusing behavior for users who enable this feature to avoid large syncs during flush and compaction, but then end up hitting them anyways. My understanding of the existing behavior is we used `sync_file_range` with `SYNC_FILE_RANGE_WRITE` to submit ranges for async writeback, such that we could continue processing the next range of bytes while that I/O is happening. I believe we can preserve that benefit while also limiting how far the processing can get ahead of the I/O, which prevents huge syncs from happening when the file finishes. Consider this `sync_file_range` usage: `sync_file_range(fd_, 0, static_cast<off_t>(offset + nbytes), SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE)`. Expanding the range to start at 0 and adding the `SYNC_FILE_RANGE_WAIT_BEFORE` flag causes any pending writeback (like from a previous call to `sync_file_range`) to finish before it proceeds to submit the latest `nbytes` for writeback. The latest `nbytes` are still written back asynchronously, unless processing exceeds I/O speed, in which case the following `sync_file_range` will need to wait on it. There is a second change in this PR to use `fdatasync` when `sync_file_range` is unavailable (determined statically) or has some known problem with the underlying filesystem (determined dynamically). The above two changes only apply when the user enables a new option, `strict_bytes_per_sync`. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5183 Differential Revision: D14953553 Pulled By: siying fbshipit-source-id: 445c3862e019fb7b470f9c7f314fc231b62706e9
2019-04-22 18:48:45 +00:00
env_options->strict_bytes_per_sync = options.strict_bytes_per_sync;
options.env->SanitizeEnvOptions(env_options);
}
}
EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options,
const DBOptions& db_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync;
optimized_env_options.writable_file_max_buffer_size =
db_options.writable_file_max_buffer_size;
return optimized_env_options;
}
EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
return env_options;
}
EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_reads = false;
return optimized_env_options;
}
EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_reads = false;
return optimized_env_options;
}
EnvOptions Env::OptimizeForCompactionTableWrite(
const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_writes =
db_options.use_direct_io_for_flush_and_compaction;
return optimized_env_options;
}
EnvOptions Env::OptimizeForCompactionTableRead(
const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_reads = db_options.use_direct_reads;
return optimized_env_options;
}
EnvOptions::EnvOptions(const DBOptions& options) {
AssignEnvOptions(this, options);
}
EnvOptions::EnvOptions() {
DBOptions options;
AssignEnvOptions(this, options);
}
Status NewEnvLogger(const std::string& fname, Env* env,
std::shared_ptr<Logger>* result) {
EnvOptions options;
// TODO: Tune the buffer size.
options.writable_file_max_buffer_size = 1024 * 1024;
std::unique_ptr<WritableFile> writable_file;
const auto status = env->NewWritableFile(fname, &writable_file, options);
if (!status.ok()) {
return status;
}
*result = std::make_shared<EnvLogger>(std::move(writable_file), fname,
options, env);
return Status::OK();
}
} // namespace rocksdb