mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-26 16:30:56 +00:00
Stream
Summary: Add a simple policy for NVMe write time life hint Closes https://github.com/facebook/rocksdb/pull/3095 Differential Revision: D6298030 Pulled By: shligit fbshipit-source-id: 9a72a42e32e92193af11599eb71f0cf77448e24d
This commit is contained in:
parent
f1c5eaba56
commit
eefd75a228
|
@ -75,7 +75,7 @@ Status BuildTable(
|
||||||
InternalStats* internal_stats, TableFileCreationReason reason,
|
InternalStats* internal_stats, TableFileCreationReason reason,
|
||||||
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
|
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
|
||||||
TableProperties* table_properties, int level, const uint64_t creation_time,
|
TableProperties* table_properties, int level, const uint64_t creation_time,
|
||||||
const uint64_t oldest_key_time) {
|
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) {
|
||||||
assert((column_family_id ==
|
assert((column_family_id ==
|
||||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
|
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
|
||||||
column_family_name.empty());
|
column_family_name.empty());
|
||||||
|
@ -117,6 +117,7 @@ Status BuildTable(
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
file->SetIOPriority(io_priority);
|
file->SetIOPriority(io_priority);
|
||||||
|
file->SetWriteLifeTimeHint(write_hint);
|
||||||
|
|
||||||
file_writer.reset(new WritableFileWriter(std::move(file), env_options,
|
file_writer.reset(new WritableFileWriter(std::move(file), env_options,
|
||||||
ioptions.statistics));
|
ioptions.statistics));
|
||||||
|
|
|
@ -79,6 +79,7 @@ extern Status BuildTable(
|
||||||
EventLogger* event_logger = nullptr, int job_id = 0,
|
EventLogger* event_logger = nullptr, int job_id = 0,
|
||||||
const Env::IOPriority io_priority = Env::IO_HIGH,
|
const Env::IOPriority io_priority = Env::IO_HIGH,
|
||||||
TableProperties* table_properties = nullptr, int level = -1,
|
TableProperties* table_properties = nullptr, int level = -1,
|
||||||
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0);
|
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,
|
||||||
|
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET);
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
|
@ -1015,6 +1015,24 @@ Status ColumnFamilyData::SetOptions(
|
||||||
}
|
}
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
|
// REQUIRES: DB mutex held
|
||||||
|
Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
|
||||||
|
if (initial_cf_options_.compaction_style != kCompactionStyleLevel) {
|
||||||
|
return Env::WLTH_NOT_SET;
|
||||||
|
}
|
||||||
|
if (level == 0) {
|
||||||
|
return Env::WLTH_MEDIUM;
|
||||||
|
}
|
||||||
|
int base_level = current_->storage_info()->base_level();
|
||||||
|
|
||||||
|
// L1: medium, L2: long, ...
|
||||||
|
if (level - base_level >= 2) {
|
||||||
|
return Env::WLTH_EXTREME;
|
||||||
|
}
|
||||||
|
return static_cast<Env::WriteLifeTimeHint>(level - base_level +
|
||||||
|
static_cast<int>(Env::WLTH_MEDIUM));
|
||||||
|
}
|
||||||
|
|
||||||
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
|
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
|
||||||
const ImmutableDBOptions* db_options,
|
const ImmutableDBOptions* db_options,
|
||||||
const EnvOptions& env_options,
|
const EnvOptions& env_options,
|
||||||
|
|
|
@ -339,6 +339,8 @@ class ColumnFamilyData {
|
||||||
|
|
||||||
bool initialized() const { return initialized_.load(); }
|
bool initialized() const { return initialized_.load(); }
|
||||||
|
|
||||||
|
Env::WriteLifeTimeHint CalculateSSTWriteHint(int level);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class ColumnFamilySet;
|
friend class ColumnFamilySet;
|
||||||
ColumnFamilyData(uint32_t id, const std::string& name,
|
ColumnFamilyData(uint32_t id, const std::string& name,
|
||||||
|
|
|
@ -297,7 +297,8 @@ CompactionJob::CompactionJob(
|
||||||
table_cache_(std::move(table_cache)),
|
table_cache_(std::move(table_cache)),
|
||||||
event_logger_(event_logger),
|
event_logger_(event_logger),
|
||||||
paranoid_file_checks_(paranoid_file_checks),
|
paranoid_file_checks_(paranoid_file_checks),
|
||||||
measure_io_stats_(measure_io_stats) {
|
measure_io_stats_(measure_io_stats),
|
||||||
|
write_hint_(Env::WLTH_NOT_SET) {
|
||||||
assert(log_buffer_ != nullptr);
|
assert(log_buffer_ != nullptr);
|
||||||
const auto* cfd = compact_->compaction->column_family_data();
|
const auto* cfd = compact_->compaction->column_family_data();
|
||||||
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
|
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
|
||||||
|
@ -368,6 +369,8 @@ void CompactionJob::Prepare() {
|
||||||
assert(c->column_family_data()->current()->storage_info()
|
assert(c->column_family_data()->current()->storage_info()
|
||||||
->NumLevelFiles(compact_->compaction->level()) > 0);
|
->NumLevelFiles(compact_->compaction->level()) > 0);
|
||||||
|
|
||||||
|
write_hint_ = c->column_family_data()->CalculateSSTWriteHint(
|
||||||
|
c->output_level());
|
||||||
// Is this compaction producing files at the bottommost level?
|
// Is this compaction producing files at the bottommost level?
|
||||||
bottommost_level_ = c->bottommost_level();
|
bottommost_level_ = c->bottommost_level();
|
||||||
|
|
||||||
|
@ -1305,6 +1308,7 @@ Status CompactionJob::OpenCompactionOutputFile(
|
||||||
|
|
||||||
sub_compact->outputs.push_back(out);
|
sub_compact->outputs.push_back(out);
|
||||||
writable_file->SetIOPriority(Env::IO_LOW);
|
writable_file->SetIOPriority(Env::IO_LOW);
|
||||||
|
writable_file->SetWriteLifeTimeHint(write_hint_);
|
||||||
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
|
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
|
||||||
sub_compact->compaction->OutputFilePreallocationSize()));
|
sub_compact->compaction->OutputFilePreallocationSize()));
|
||||||
sub_compact->outfile.reset(new WritableFileWriter(
|
sub_compact->outfile.reset(new WritableFileWriter(
|
||||||
|
|
|
@ -167,6 +167,7 @@ class CompactionJob {
|
||||||
std::vector<Slice> boundaries_;
|
std::vector<Slice> boundaries_;
|
||||||
// Stores the approx size of keys covered in the range of each subcompaction
|
// Stores the approx size of keys covered in the range of each subcompaction
|
||||||
std::vector<uint64_t> sizes_;
|
std::vector<uint64_t> sizes_;
|
||||||
|
Env::WriteLifeTimeHint write_hint_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
|
@ -1316,6 +1316,9 @@ class DBImpl : public DB {
|
||||||
bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
|
bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
|
||||||
|
|
||||||
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
|
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
|
||||||
|
Env::WriteLifeTimeHint CalculateWALWriteHint() {
|
||||||
|
return Env::WLTH_SHORT;
|
||||||
|
}
|
||||||
|
|
||||||
// When set, we use a seprate queue for writes that dont write to memtable. In
|
// When set, we use a seprate queue for writes that dont write to memtable. In
|
||||||
// 2PC these are the writes at Prepare phase.
|
// 2PC these are the writes at Prepare phase.
|
||||||
|
|
|
@ -893,6 +893,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||||
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
||||||
|
|
||||||
{
|
{
|
||||||
|
auto write_hint = cfd->CalculateSSTWriteHint(0);
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
|
|
||||||
SequenceNumber earliest_write_conflict_snapshot;
|
SequenceNumber earliest_write_conflict_snapshot;
|
||||||
|
@ -913,7 +914,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||||
cfd->ioptions()->compression_opts, paranoid_file_checks,
|
cfd->ioptions()->compression_opts, paranoid_file_checks,
|
||||||
cfd->internal_stats(), TableFileCreationReason::kRecovery,
|
cfd->internal_stats(), TableFileCreationReason::kRecovery,
|
||||||
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
|
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
|
||||||
-1 /* level */, current_time);
|
-1 /* level */, current_time, write_hint);
|
||||||
LogFlush(immutable_db_options_.info_log);
|
LogFlush(immutable_db_options_.info_log);
|
||||||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
||||||
"[%s] [WriteLevel0TableForRecovery]"
|
"[%s] [WriteLevel0TableForRecovery]"
|
||||||
|
@ -1007,6 +1008,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
impl->mutex_.Lock();
|
impl->mutex_.Lock();
|
||||||
|
auto write_hint = impl->CalculateWALWriteHint();
|
||||||
// Handles create_if_missing, error_if_exists
|
// Handles create_if_missing, error_if_exists
|
||||||
s = impl->Recover(column_families);
|
s = impl->Recover(column_families);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
|
@ -1022,6 +1024,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
||||||
LogFileName(impl->immutable_db_options_.wal_dir, new_log_number),
|
LogFileName(impl->immutable_db_options_.wal_dir, new_log_number),
|
||||||
&lfile, opt_env_options);
|
&lfile, opt_env_options);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
|
lfile->SetWriteLifeTimeHint(write_hint);
|
||||||
lfile->SetPreallocationBlockSize(
|
lfile->SetPreallocationBlockSize(
|
||||||
impl->GetWalPreallocateBlockSize(max_write_buffer_size));
|
impl->GetWalPreallocateBlockSize(max_write_buffer_size));
|
||||||
{
|
{
|
||||||
|
|
|
@ -1168,6 +1168,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||||
BuildDBOptions(immutable_db_options_, mutable_db_options_);
|
BuildDBOptions(immutable_db_options_, mutable_db_options_);
|
||||||
const auto preallocate_block_size =
|
const auto preallocate_block_size =
|
||||||
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
|
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
|
||||||
|
auto write_hint = CalculateWALWriteHint();
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
{
|
{
|
||||||
if (creating_new_log) {
|
if (creating_new_log) {
|
||||||
|
@ -1193,6 +1194,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||||
// use preallocate_block_size instead
|
// use preallocate_block_size instead
|
||||||
// of calling GetWalPreallocateBlockSize()
|
// of calling GetWalPreallocateBlockSize()
|
||||||
lfile->SetPreallocationBlockSize(preallocate_block_size);
|
lfile->SetPreallocationBlockSize(preallocate_block_size);
|
||||||
|
lfile->SetWriteLifeTimeHint(write_hint);
|
||||||
unique_ptr<WritableFileWriter> file_writer(
|
unique_ptr<WritableFileWriter> file_writer(
|
||||||
new WritableFileWriter(std::move(lfile), opt_env_opt));
|
new WritableFileWriter(std::move(lfile), opt_env_opt));
|
||||||
new_log = new log::Writer(
|
new_log = new log::Writer(
|
||||||
|
|
|
@ -246,6 +246,7 @@ Status FlushJob::WriteLevel0Table() {
|
||||||
const uint64_t start_micros = db_options_.env->NowMicros();
|
const uint64_t start_micros = db_options_.env->NowMicros();
|
||||||
Status s;
|
Status s;
|
||||||
{
|
{
|
||||||
|
auto write_hint = cfd_->CalculateSSTWriteHint(0);
|
||||||
db_mutex_->Unlock();
|
db_mutex_->Unlock();
|
||||||
if (log_buffer_) {
|
if (log_buffer_) {
|
||||||
log_buffer_->FlushBufferToLog();
|
log_buffer_->FlushBufferToLog();
|
||||||
|
@ -315,7 +316,7 @@ Status FlushJob::WriteLevel0Table() {
|
||||||
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
|
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
|
||||||
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
|
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
|
||||||
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
|
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
|
||||||
oldest_key_time);
|
oldest_key_time, write_hint);
|
||||||
LogFlush(db_options_.info_log);
|
LogFlush(db_options_.info_log);
|
||||||
}
|
}
|
||||||
ROCKS_LOG_INFO(db_options_.info_log,
|
ROCKS_LOG_INFO(db_options_.info_log,
|
||||||
|
|
|
@ -401,6 +401,8 @@ class Repairer {
|
||||||
status = env_->GetCurrentTime(&_current_time); // ignore error
|
status = env_->GetCurrentTime(&_current_time); // ignore error
|
||||||
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
||||||
SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
|
SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
|
||||||
|
|
||||||
|
auto write_hint = cfd->CalculateSSTWriteHint(0);
|
||||||
status = BuildTable(
|
status = BuildTable(
|
||||||
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
|
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
|
||||||
env_options_, table_cache_, iter.get(),
|
env_options_, table_cache_, iter.get(),
|
||||||
|
@ -411,7 +413,7 @@ class Repairer {
|
||||||
CompressionOptions(), false, nullptr /* internal_stats */,
|
CompressionOptions(), false, nullptr /* internal_stats */,
|
||||||
TableFileCreationReason::kRecovery, nullptr /* event_logger */,
|
TableFileCreationReason::kRecovery, nullptr /* event_logger */,
|
||||||
0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
|
0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
|
||||||
-1 /* level */, current_time);
|
-1 /* level */, current_time, write_hint);
|
||||||
ROCKS_LOG_INFO(db_options_.info_log,
|
ROCKS_LOG_INFO(db_options_.info_log,
|
||||||
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
|
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
|
||||||
log, counter, meta.fd.GetNumber(),
|
log, counter, meta.fd.GetNumber(),
|
||||||
|
|
16
env/io_posix.cc
vendored
16
env/io_posix.cc
vendored
|
@ -35,6 +35,11 @@
|
||||||
#include "util/string_util.h"
|
#include "util/string_util.h"
|
||||||
#include "util/sync_point.h"
|
#include "util/sync_point.h"
|
||||||
|
|
||||||
|
#if defined(OS_LINUX) && !defined(F_SET_RW_HINT)
|
||||||
|
#define F_LINUX_SPECIFIC_BASE 1024
|
||||||
|
#define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12)
|
||||||
|
#endif
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
// A wrapper for fadvise, if the platform doesn't support fadvise,
|
// A wrapper for fadvise, if the platform doesn't support fadvise,
|
||||||
|
@ -858,6 +863,17 @@ bool PosixWritableFile::IsSyncThreadSafe() const { return true; }
|
||||||
|
|
||||||
uint64_t PosixWritableFile::GetFileSize() { return filesize_; }
|
uint64_t PosixWritableFile::GetFileSize() { return filesize_; }
|
||||||
|
|
||||||
|
void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
|
||||||
|
#ifdef OS_LINUX
|
||||||
|
if (hint == write_hint_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) {
|
||||||
|
write_hint_ = hint;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
|
Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
|
||||||
if (use_direct_io()) {
|
if (use_direct_io()) {
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
|
|
1
env/io_posix.h
vendored
1
env/io_posix.h
vendored
|
@ -132,6 +132,7 @@ class PosixWritableFile : public WritableFile {
|
||||||
virtual Status Fsync() override;
|
virtual Status Fsync() override;
|
||||||
virtual bool IsSyncThreadSafe() const override;
|
virtual bool IsSyncThreadSafe() const override;
|
||||||
virtual bool use_direct_io() const override { return use_direct_io_; }
|
virtual bool use_direct_io() const override { return use_direct_io_; }
|
||||||
|
virtual void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override;
|
||||||
virtual uint64_t GetFileSize() override;
|
virtual uint64_t GetFileSize() override;
|
||||||
virtual Status InvalidateCache(size_t offset, size_t length) override;
|
virtual Status InvalidateCache(size_t offset, size_t length) override;
|
||||||
virtual size_t GetRequiredBufferAlignment() const override {
|
virtual size_t GetRequiredBufferAlignment() const override {
|
||||||
|
|
|
@ -152,6 +152,16 @@ class Env {
|
||||||
unique_ptr<RandomAccessFile>* result,
|
unique_ptr<RandomAccessFile>* result,
|
||||||
const EnvOptions& options)
|
const EnvOptions& options)
|
||||||
= 0;
|
= 0;
|
||||||
|
// These values match Linux definition
|
||||||
|
// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/uapi/linux/fcntl.h#n56
|
||||||
|
enum WriteLifeTimeHint {
|
||||||
|
WLTH_NOT_SET = 0, // No hint information set
|
||||||
|
WLTH_NONE, // No hints about write life time
|
||||||
|
WLTH_SHORT, // Data written has a short life time
|
||||||
|
WLTH_MEDIUM, // Data written has a medium life time
|
||||||
|
WLTH_LONG, // Data written has a long life time
|
||||||
|
WLTH_EXTREME, // Data written has an extremely long life time
|
||||||
|
};
|
||||||
|
|
||||||
// Create an object that writes to a new file with the specified
|
// Create an object that writes to a new file with the specified
|
||||||
// name. Deletes any existing file with the same name and creates a
|
// name. Deletes any existing file with the same name and creates a
|
||||||
|
@ -573,7 +583,8 @@ class WritableFile {
|
||||||
WritableFile()
|
WritableFile()
|
||||||
: last_preallocated_block_(0),
|
: last_preallocated_block_(0),
|
||||||
preallocation_block_size_(0),
|
preallocation_block_size_(0),
|
||||||
io_priority_(Env::IO_TOTAL) {
|
io_priority_(Env::IO_TOTAL),
|
||||||
|
write_hint_(Env::WLTH_NOT_SET) {
|
||||||
}
|
}
|
||||||
virtual ~WritableFile();
|
virtual ~WritableFile();
|
||||||
|
|
||||||
|
@ -650,6 +661,11 @@ class WritableFile {
|
||||||
|
|
||||||
virtual Env::IOPriority GetIOPriority() { return io_priority_; }
|
virtual Env::IOPriority GetIOPriority() { return io_priority_; }
|
||||||
|
|
||||||
|
virtual void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
|
||||||
|
write_hint_ = hint;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual Env::WriteLifeTimeHint GetWriteLifeTimeHint() { return write_hint_; }
|
||||||
/*
|
/*
|
||||||
* Get the size of valid data in the file.
|
* Get the size of valid data in the file.
|
||||||
*/
|
*/
|
||||||
|
@ -738,6 +754,7 @@ class WritableFile {
|
||||||
friend class WritableFileMirror;
|
friend class WritableFileMirror;
|
||||||
|
|
||||||
Env::IOPriority io_priority_;
|
Env::IOPriority io_priority_;
|
||||||
|
Env::WriteLifeTimeHint write_hint_;
|
||||||
};
|
};
|
||||||
|
|
||||||
// A file abstraction for random reading and writing.
|
// A file abstraction for random reading and writing.
|
||||||
|
|
Loading…
Reference in a new issue