mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-26 16:30:56 +00:00
5879053fd0
Summary: **Summary** Make the mempurge option flag a Mutable Column Family option flag. Therefore, the mempurge feature can be dynamically toggled. **Motivation** RocksDB users prefer having the ability to switch features on and off without having to close and reopen the DB. This is particularly important if the feature causes issues and needs to be turned off. Dynamically changing a DB option flag does not seem currently possible. Moreover, with this new change, the MemPurge feature can be toggled on or off independently between column families, which we see as a major improvement. **Content of this PR** This PR includes removal of the `experimental_mempurge_threshold` flag as a DB option flag, and its re-introduction as a `MutableCFOption` flag. I updated the code to handle dynamic changes of the flag (in particular inside the `FlushJob` file). Additionally, this PR includes a new test to demonstrate the capacity of the code to toggle the MemPurge feature on and off, as well as the addition in the `db_stress` module of 2 different mempurge threshold values (0.0 and 1.0) that can be randomly changed with the `set_option_one_in` flag. This is useful to stress test the dynamic changes. **Benchmarking** I will add numbers to prove that there is no performance impact within the next 12 hours. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10011 Reviewed By: pdillinger Differential Revision: D36462357 Pulled By: bjlemaire fbshipit-source-id: 5e3d63bdadf085c0572ecc2349e7dd9729ce1802
1092 lines
43 KiB
C++
1092 lines
43 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/flush_job.h"
|
|
|
|
#include <algorithm>
|
|
#include <cinttypes>
|
|
#include <vector>
|
|
|
|
#include "db/builder.h"
|
|
#include "db/db_iter.h"
|
|
#include "db/dbformat.h"
|
|
#include "db/event_helpers.h"
|
|
#include "db/log_reader.h"
|
|
#include "db/log_writer.h"
|
|
#include "db/memtable.h"
|
|
#include "db/memtable_list.h"
|
|
#include "db/merge_context.h"
|
|
#include "db/range_tombstone_fragmenter.h"
|
|
#include "db/version_set.h"
|
|
#include "file/file_util.h"
|
|
#include "file/filename.h"
|
|
#include "logging/event_logger.h"
|
|
#include "logging/log_buffer.h"
|
|
#include "logging/logging.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "monitoring/perf_context_imp.h"
|
|
#include "monitoring/thread_status_util.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/statistics.h"
|
|
#include "rocksdb/status.h"
|
|
#include "rocksdb/table.h"
|
|
#include "table/merging_iterator.h"
|
|
#include "table/table_builder.h"
|
|
#include "table/two_level_iterator.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/coding.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/stop_watch.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
const char* GetFlushReasonString (FlushReason flush_reason) {
|
|
switch (flush_reason) {
|
|
case FlushReason::kOthers:
|
|
return "Other Reasons";
|
|
case FlushReason::kGetLiveFiles:
|
|
return "Get Live Files";
|
|
case FlushReason::kShutDown:
|
|
return "Shut down";
|
|
case FlushReason::kExternalFileIngestion:
|
|
return "External File Ingestion";
|
|
case FlushReason::kManualCompaction:
|
|
return "Manual Compaction";
|
|
case FlushReason::kWriteBufferManager:
|
|
return "Write Buffer Manager";
|
|
case FlushReason::kWriteBufferFull:
|
|
return "Write Buffer Full";
|
|
case FlushReason::kTest:
|
|
return "Test";
|
|
case FlushReason::kDeleteFiles:
|
|
return "Delete Files";
|
|
case FlushReason::kAutoCompaction:
|
|
return "Auto Compaction";
|
|
case FlushReason::kManualFlush:
|
|
return "Manual Flush";
|
|
case FlushReason::kErrorRecovery:
|
|
return "Error Recovery";
|
|
case FlushReason::kWalFull:
|
|
return "WAL Full";
|
|
default:
|
|
return "Invalid";
|
|
}
|
|
}
|
|
|
|
FlushJob::FlushJob(
|
|
const std::string& dbname, ColumnFamilyData* cfd,
|
|
const ImmutableDBOptions& db_options,
|
|
const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
|
|
const FileOptions& file_options, VersionSet* versions,
|
|
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
|
|
std::vector<SequenceNumber> existing_snapshots,
|
|
SequenceNumber earliest_write_conflict_snapshot,
|
|
SnapshotChecker* snapshot_checker, JobContext* job_context,
|
|
LogBuffer* log_buffer, FSDirectory* db_directory,
|
|
FSDirectory* output_file_directory, CompressionType output_compression,
|
|
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
|
|
const bool sync_output_directory, const bool write_manifest,
|
|
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
|
|
const std::string& db_id, const std::string& db_session_id,
|
|
std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
|
|
: dbname_(dbname),
|
|
db_id_(db_id),
|
|
db_session_id_(db_session_id),
|
|
cfd_(cfd),
|
|
db_options_(db_options),
|
|
mutable_cf_options_(mutable_cf_options),
|
|
max_memtable_id_(max_memtable_id),
|
|
file_options_(file_options),
|
|
versions_(versions),
|
|
db_mutex_(db_mutex),
|
|
shutting_down_(shutting_down),
|
|
existing_snapshots_(std::move(existing_snapshots)),
|
|
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
|
|
snapshot_checker_(snapshot_checker),
|
|
job_context_(job_context),
|
|
log_buffer_(log_buffer),
|
|
db_directory_(db_directory),
|
|
output_file_directory_(output_file_directory),
|
|
output_compression_(output_compression),
|
|
stats_(stats),
|
|
event_logger_(event_logger),
|
|
measure_io_stats_(measure_io_stats),
|
|
sync_output_directory_(sync_output_directory),
|
|
write_manifest_(write_manifest),
|
|
edit_(nullptr),
|
|
base_(nullptr),
|
|
pick_memtable_called(false),
|
|
thread_pri_(thread_pri),
|
|
io_tracer_(io_tracer),
|
|
clock_(db_options_.clock),
|
|
full_history_ts_low_(std::move(full_history_ts_low)),
|
|
blob_callback_(blob_callback) {
|
|
// Update the thread status to indicate flush.
|
|
ReportStartedFlush();
|
|
TEST_SYNC_POINT("FlushJob::FlushJob()");
|
|
}
|
|
|
|
FlushJob::~FlushJob() {
|
|
ThreadStatusUtil::ResetThreadStatus();
|
|
}
|
|
|
|
void FlushJob::ReportStartedFlush() {
|
|
ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
|
|
db_options_.enable_thread_tracking);
|
|
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
|
|
ThreadStatusUtil::SetThreadOperationProperty(
|
|
ThreadStatus::COMPACTION_JOB_ID,
|
|
job_context_->job_id);
|
|
IOSTATS_RESET(bytes_written);
|
|
}
|
|
|
|
void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
|
|
uint64_t input_size = 0;
|
|
for (auto* mem : mems) {
|
|
input_size += mem->ApproximateMemoryUsage();
|
|
}
|
|
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
|
ThreadStatus::FLUSH_BYTES_MEMTABLES,
|
|
input_size);
|
|
}
|
|
|
|
void FlushJob::RecordFlushIOStats() {
|
|
RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
|
|
ThreadStatusUtil::IncreaseThreadOperationProperty(
|
|
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
|
|
IOSTATS_RESET(bytes_written);
|
|
}
|
|
void FlushJob::PickMemTable() {
|
|
db_mutex_->AssertHeld();
|
|
assert(!pick_memtable_called);
|
|
pick_memtable_called = true;
|
|
|
|
// Maximum "NextLogNumber" of the memtables to flush.
|
|
// When mempurge feature is turned off, this variable is useless
|
|
// because the memtables are implicitly sorted by increasing order of creation
|
|
// time. Therefore mems_->back()->GetNextLogNumber() is already equal to
|
|
// max_next_log_number. However when Mempurge is on, the memtables are no
|
|
// longer sorted by increasing order of creation time. Therefore this variable
|
|
// becomes necessary because mems_->back()->GetNextLogNumber() is no longer
|
|
// necessarily equal to max_next_log_number.
|
|
uint64_t max_next_log_number = 0;
|
|
|
|
// Save the contents of the earliest memtable as a new Table
|
|
cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_,
|
|
&max_next_log_number);
|
|
if (mems_.empty()) {
|
|
return;
|
|
}
|
|
|
|
ReportFlushInputSize(mems_);
|
|
|
|
// entries mems are (implicitly) sorted in ascending order by their created
|
|
// time. We will use the first memtable's `edit` to keep the meta info for
|
|
// this flush.
|
|
MemTable* m = mems_[0];
|
|
edit_ = m->GetEdits();
|
|
edit_->SetPrevLogNumber(0);
|
|
// SetLogNumber(log_num) indicates logs with number smaller than log_num
|
|
// will no longer be picked up for recovery.
|
|
edit_->SetLogNumber(max_next_log_number);
|
|
edit_->SetColumnFamily(cfd_->GetID());
|
|
|
|
// path 0 for level 0 file.
|
|
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
|
|
|
|
base_ = cfd_->current();
|
|
base_->Ref(); // it is likely that we do not need this reference
|
|
}
|
|
|
|
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
|
|
bool* switched_to_mempurge) {
|
|
TEST_SYNC_POINT("FlushJob::Start");
|
|
db_mutex_->AssertHeld();
|
|
assert(pick_memtable_called);
|
|
// Mempurge threshold can be dynamically changed.
|
|
// For sake of consistency, mempurge_threshold is
|
|
// saved locally to maintain consistency in each
|
|
// FlushJob::Run call.
|
|
double mempurge_threshold =
|
|
mutable_cf_options_.experimental_mempurge_threshold;
|
|
|
|
AutoThreadOperationStageUpdater stage_run(
|
|
ThreadStatus::STAGE_FLUSH_RUN);
|
|
if (mems_.empty()) {
|
|
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
|
|
cfd_->GetName().c_str());
|
|
return Status::OK();
|
|
}
|
|
|
|
// I/O measurement variables
|
|
PerfLevel prev_perf_level = PerfLevel::kEnableTime;
|
|
uint64_t prev_write_nanos = 0;
|
|
uint64_t prev_fsync_nanos = 0;
|
|
uint64_t prev_range_sync_nanos = 0;
|
|
uint64_t prev_prepare_write_nanos = 0;
|
|
uint64_t prev_cpu_write_nanos = 0;
|
|
uint64_t prev_cpu_read_nanos = 0;
|
|
if (measure_io_stats_) {
|
|
prev_perf_level = GetPerfLevel();
|
|
SetPerfLevel(PerfLevel::kEnableTime);
|
|
prev_write_nanos = IOSTATS(write_nanos);
|
|
prev_fsync_nanos = IOSTATS(fsync_nanos);
|
|
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
|
|
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
|
|
prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
|
|
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
|
|
}
|
|
Status mempurge_s = Status::NotFound("No MemPurge.");
|
|
if ((mempurge_threshold > 0.0) &&
|
|
(cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
|
|
(!mems_.empty()) && MemPurgeDecider(mempurge_threshold) &&
|
|
!(db_options_.atomic_flush)) {
|
|
cfd_->SetMempurgeUsed();
|
|
mempurge_s = MemPurge();
|
|
if (!mempurge_s.ok()) {
|
|
// Mempurge is typically aborted when the output
|
|
// bytes cannot be contained onto a single output memtable.
|
|
if (mempurge_s.IsAborted()) {
|
|
ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n",
|
|
mempurge_s.ToString().c_str());
|
|
} else {
|
|
// However the mempurge process can also fail for
|
|
// other reasons (eg: new_mem->Add() fails).
|
|
ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n",
|
|
mempurge_s.ToString().c_str());
|
|
}
|
|
} else {
|
|
if (switched_to_mempurge) {
|
|
*switched_to_mempurge = true;
|
|
} else {
|
|
// The mempurge process was successful, but no switch_to_mempurge
|
|
// pointer provided so no way to propagate the state of flush job.
|
|
ROCKS_LOG_WARN(db_options_.info_log,
|
|
"Mempurge process succeeded"
|
|
"but no 'switched_to_mempurge' ptr provided.\n");
|
|
}
|
|
}
|
|
}
|
|
Status s;
|
|
if (mempurge_s.ok()) {
|
|
base_->Unref();
|
|
s = Status::OK();
|
|
} else {
|
|
// This will release and re-acquire the mutex.
|
|
s = WriteLevel0Table();
|
|
}
|
|
|
|
if (s.ok() && cfd_->IsDropped()) {
|
|
s = Status::ColumnFamilyDropped("Column family dropped during compaction");
|
|
}
|
|
if ((s.ok() || s.IsColumnFamilyDropped()) &&
|
|
shutting_down_->load(std::memory_order_acquire)) {
|
|
s = Status::ShutdownInProgress("Database shutdown");
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
|
|
} else if (write_manifest_) {
|
|
TEST_SYNC_POINT("FlushJob::InstallResults");
|
|
// Replace immutable memtable with the generated Table
|
|
s = cfd_->imm()->TryInstallMemtableFlushResults(
|
|
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
|
|
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
|
|
log_buffer_, &committed_flush_jobs_info_,
|
|
!(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
|
|
but 'false' if mempurge successful: no new min log number
|
|
or new level 0 file path to write to manifest. */);
|
|
}
|
|
|
|
if (s.ok() && file_meta != nullptr) {
|
|
*file_meta = meta_;
|
|
}
|
|
RecordFlushIOStats();
|
|
|
|
// When measure_io_stats_ is true, the default 512 bytes is not enough.
|
|
auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
|
|
stream << "job" << job_context_->job_id << "event"
|
|
<< "flush_finished";
|
|
stream << "output_compression"
|
|
<< CompressionTypeToString(output_compression_);
|
|
stream << "lsm_state";
|
|
stream.StartArray();
|
|
auto vstorage = cfd_->current()->storage_info();
|
|
for (int level = 0; level < vstorage->num_levels(); ++level) {
|
|
stream << vstorage->NumLevelFiles(level);
|
|
}
|
|
stream.EndArray();
|
|
|
|
const auto& blob_files = vstorage->GetBlobFiles();
|
|
if (!blob_files.empty()) {
|
|
assert(blob_files.front());
|
|
stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
|
|
|
|
assert(blob_files.back());
|
|
stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
|
|
}
|
|
|
|
stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
|
|
|
|
if (measure_io_stats_) {
|
|
if (prev_perf_level != PerfLevel::kEnableTime) {
|
|
SetPerfLevel(prev_perf_level);
|
|
}
|
|
stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
|
|
stream << "file_range_sync_nanos"
|
|
<< (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
|
|
stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
|
|
stream << "file_prepare_write_nanos"
|
|
<< (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
|
|
stream << "file_cpu_write_nanos"
|
|
<< (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
|
|
stream << "file_cpu_read_nanos"
|
|
<< (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
void FlushJob::Cancel() {
|
|
db_mutex_->AssertHeld();
|
|
assert(base_ != nullptr);
|
|
base_->Unref();
|
|
}
|
|
|
|
Status FlushJob::MemPurge() {
|
|
Status s;
|
|
db_mutex_->AssertHeld();
|
|
db_mutex_->Unlock();
|
|
assert(!mems_.empty());
|
|
|
|
// Measure purging time.
|
|
const uint64_t start_micros = clock_->NowMicros();
|
|
const uint64_t start_cpu_micros = clock_->CPUMicros();
|
|
|
|
MemTable* new_mem = nullptr;
|
|
// For performance/log investigation purposes:
|
|
// look at how much useful payload we harvest in the new_mem.
|
|
// This value is then printed to the DB log.
|
|
double new_mem_capacity = 0.0;
|
|
|
|
// Create two iterators, one for the memtable data (contains
|
|
// info from puts + deletes), and one for the memtable
|
|
// Range Tombstones (from DeleteRanges).
|
|
ReadOptions ro;
|
|
ro.total_order_seek = true;
|
|
Arena arena;
|
|
std::vector<InternalIterator*> memtables;
|
|
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
|
range_del_iters;
|
|
for (MemTable* m : mems_) {
|
|
memtables.push_back(m->NewIterator(ro, &arena));
|
|
auto* range_del_iter = m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
|
|
if (range_del_iter != nullptr) {
|
|
range_del_iters.emplace_back(range_del_iter);
|
|
}
|
|
}
|
|
|
|
assert(!memtables.empty());
|
|
SequenceNumber first_seqno = kMaxSequenceNumber;
|
|
SequenceNumber earliest_seqno = kMaxSequenceNumber;
|
|
// Pick first and earliest seqno as min of all first_seqno
|
|
// and earliest_seqno of the mempurged memtables.
|
|
for (const auto& mem : mems_) {
|
|
first_seqno = mem->GetFirstSequenceNumber() < first_seqno
|
|
? mem->GetFirstSequenceNumber()
|
|
: first_seqno;
|
|
earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno
|
|
? mem->GetEarliestSequenceNumber()
|
|
: earliest_seqno;
|
|
}
|
|
|
|
ScopedArenaIterator iter(
|
|
NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
|
|
static_cast<int>(memtables.size()), &arena));
|
|
|
|
auto* ioptions = cfd_->ioptions();
|
|
|
|
// Place iterator at the First (meaning most recent) key node.
|
|
iter->SeekToFirst();
|
|
|
|
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
|
|
new CompactionRangeDelAggregator(&(cfd_->internal_comparator()),
|
|
existing_snapshots_));
|
|
for (auto& rd_iter : range_del_iters) {
|
|
range_del_agg->AddTombstones(std::move(rd_iter));
|
|
}
|
|
|
|
// If there is valid data in the memtable,
|
|
// or at least range tombstones, copy over the info
|
|
// to the new memtable.
|
|
if (iter->Valid() || !range_del_agg->IsEmpty()) {
|
|
// MaxSize is the size of a memtable.
|
|
size_t maxSize = mutable_cf_options_.write_buffer_size;
|
|
std::unique_ptr<CompactionFilter> compaction_filter;
|
|
if (ioptions->compaction_filter_factory != nullptr &&
|
|
ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
|
|
TableFileCreationReason::kFlush)) {
|
|
CompactionFilter::Context ctx;
|
|
ctx.is_full_compaction = false;
|
|
ctx.is_manual_compaction = false;
|
|
ctx.column_family_id = cfd_->GetID();
|
|
ctx.reason = TableFileCreationReason::kFlush;
|
|
compaction_filter =
|
|
ioptions->compaction_filter_factory->CreateCompactionFilter(ctx);
|
|
if (compaction_filter != nullptr &&
|
|
!compaction_filter->IgnoreSnapshots()) {
|
|
s = Status::NotSupported(
|
|
"CompactionFilter::IgnoreSnapshots() = false is not supported "
|
|
"anymore.");
|
|
return s;
|
|
}
|
|
}
|
|
|
|
new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()),
|
|
mutable_cf_options_, cfd_->write_buffer_mgr(),
|
|
earliest_seqno, cfd_->GetID());
|
|
assert(new_mem != nullptr);
|
|
|
|
Env* env = db_options_.env;
|
|
assert(env);
|
|
MergeHelper merge(
|
|
env, (cfd_->internal_comparator()).user_comparator(),
|
|
(ioptions->merge_operator).get(), compaction_filter.get(),
|
|
ioptions->logger, true /* internal key corruption is not ok */,
|
|
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
|
|
snapshot_checker_);
|
|
assert(job_context_);
|
|
SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence();
|
|
const std::atomic<bool> kManualCompactionCanceledFalse{false};
|
|
CompactionIterator c_iter(
|
|
iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
|
|
kMaxSequenceNumber, &existing_snapshots_,
|
|
earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_,
|
|
env, ShouldReportDetailedTime(env, ioptions->stats),
|
|
true /* internal key corruption is not ok */, range_del_agg.get(),
|
|
nullptr, ioptions->allow_data_in_errors,
|
|
ioptions->enforce_single_del_contracts,
|
|
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
|
|
/*compaction=*/nullptr, compaction_filter.get(),
|
|
/*shutting_down=*/nullptr, ioptions->info_log,
|
|
&(cfd_->GetFullHistoryTsLow()));
|
|
|
|
// Set earliest sequence number in the new memtable
|
|
// to be equal to the earliest sequence number of the
|
|
// memtable being flushed (See later if there is a need
|
|
// to update this number!).
|
|
new_mem->SetEarliestSequenceNumber(earliest_seqno);
|
|
// Likewise for first seq number.
|
|
new_mem->SetFirstSequenceNumber(first_seqno);
|
|
SequenceNumber new_first_seqno = kMaxSequenceNumber;
|
|
|
|
c_iter.SeekToFirst();
|
|
|
|
// Key transfer
|
|
for (; c_iter.Valid(); c_iter.Next()) {
|
|
const ParsedInternalKey ikey = c_iter.ikey();
|
|
const Slice value = c_iter.value();
|
|
new_first_seqno =
|
|
ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
|
|
|
|
// Should we update "OldestKeyTime" ???? -> timestamp appear
|
|
// to still be an "experimental" feature.
|
|
s = new_mem->Add(
|
|
ikey.sequence, ikey.type, ikey.user_key, value,
|
|
nullptr, // KV protection info set as nullptr since it
|
|
// should only be useful for the first add to
|
|
// the original memtable.
|
|
false, // : allow concurrent_memtable_writes_
|
|
// Not seen as necessary for now.
|
|
nullptr, // get_post_process_info(m) must be nullptr
|
|
// when concurrent_memtable_writes is switched off.
|
|
nullptr); // hint, only used when concurrent_memtable_writes_
|
|
// is switched on.
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
|
|
// If new_mem has size greater than maxSize,
|
|
// then rollback to regular flush operation,
|
|
// and destroy new_mem.
|
|
if (new_mem->ApproximateMemoryUsage() > maxSize) {
|
|
s = Status::Aborted("Mempurge filled more than one memtable.");
|
|
new_mem_capacity = 1.0;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Check status and propagate
|
|
// potential error status from c_iter
|
|
if (!s.ok()) {
|
|
c_iter.status().PermitUncheckedError();
|
|
} else if (!c_iter.status().ok()) {
|
|
s = c_iter.status();
|
|
}
|
|
|
|
// Range tombstone transfer.
|
|
if (s.ok()) {
|
|
auto range_del_it = range_del_agg->NewIterator();
|
|
for (range_del_it->SeekToFirst(); range_del_it->Valid();
|
|
range_del_it->Next()) {
|
|
auto tombstone = range_del_it->Tombstone();
|
|
new_first_seqno =
|
|
tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
|
|
s = new_mem->Add(
|
|
tombstone.seq_, // Sequence number
|
|
kTypeRangeDeletion, // KV type
|
|
tombstone.start_key_, // Key is start key.
|
|
tombstone.end_key_, // Value is end key.
|
|
nullptr, // KV protection info set as nullptr since it
|
|
// should only be useful for the first add to
|
|
// the original memtable.
|
|
false, // : allow concurrent_memtable_writes_
|
|
// Not seen as necessary for now.
|
|
nullptr, // get_post_process_info(m) must be nullptr
|
|
// when concurrent_memtable_writes is switched off.
|
|
nullptr); // hint, only used when concurrent_memtable_writes_
|
|
// is switched on.
|
|
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
|
|
// If new_mem has size greater than maxSize,
|
|
// then rollback to regular flush operation,
|
|
// and destroy new_mem.
|
|
if (new_mem->ApproximateMemoryUsage() > maxSize) {
|
|
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
|
|
new_mem_capacity = 1.0;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// If everything happened smoothly and new_mem contains valid data,
|
|
// decide if it is flushed to storage or kept in the imm()
|
|
// memtable list (memory).
|
|
if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) {
|
|
// Rectify the first sequence number, which (unlike the earliest seq
|
|
// number) needs to be present in the new memtable.
|
|
new_mem->SetFirstSequenceNumber(new_first_seqno);
|
|
|
|
// The new_mem is added to the list of immutable memtables
|
|
// only if it filled at less than 100% capacity and isn't flagged
|
|
// as in need of being flushed.
|
|
if (new_mem->ApproximateMemoryUsage() < maxSize &&
|
|
!(new_mem->ShouldFlushNow())) {
|
|
db_mutex_->Lock();
|
|
uint64_t new_mem_id = mems_[0]->GetID();
|
|
|
|
new_mem->SetID(new_mem_id);
|
|
new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber());
|
|
|
|
// This addition will not trigger another flush, because
|
|
// we do not call SchedulePendingFlush().
|
|
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
|
|
new_mem->Ref();
|
|
#ifndef ROCKSDB_LITE
|
|
// Piggyback FlushJobInfo on the first flushed memtable.
|
|
db_mutex_->AssertHeld();
|
|
meta_.fd.file_size = 0;
|
|
mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
|
|
#endif // !ROCKSDB_LITE
|
|
db_mutex_->Unlock();
|
|
} else {
|
|
s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
|
|
new_mem_capacity = 1.0;
|
|
if (new_mem) {
|
|
job_context_->memtables_to_free.push_back(new_mem);
|
|
}
|
|
}
|
|
} else {
|
|
// In this case, the newly allocated new_mem is empty.
|
|
assert(new_mem != nullptr);
|
|
job_context_->memtables_to_free.push_back(new_mem);
|
|
}
|
|
}
|
|
|
|
// Reacquire the mutex for WriteLevel0 function.
|
|
db_mutex_->Lock();
|
|
|
|
// If mempurge successful, don't write input tables to level0,
|
|
// but write any full output table to level0.
|
|
if (s.ok()) {
|
|
TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful");
|
|
} else {
|
|
TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
|
|
}
|
|
const uint64_t micros = clock_->NowMicros() - start_micros;
|
|
const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros;
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"[%s] [JOB %d] Mempurge lasted %" PRIu64
|
|
" microseconds, and %" PRIu64
|
|
" cpu "
|
|
"microseconds. Status is %s ok. Perc capacity: %f\n",
|
|
cfd_->GetName().c_str(), job_context_->job_id, micros,
|
|
cpu_micros, s.ok() ? "" : "not", new_mem_capacity);
|
|
|
|
return s;
|
|
}
|
|
|
|
bool FlushJob::MemPurgeDecider(double threshold) {
|
|
// Never trigger mempurge if threshold is not a strictly positive value.
|
|
if (!(threshold > 0.0)) {
|
|
return false;
|
|
}
|
|
if (threshold > (1.0 * mems_.size())) {
|
|
return true;
|
|
}
|
|
// Payload and useful_payload (in bytes).
|
|
// The useful payload ratio of a given MemTable
|
|
// is estimated to be useful_payload/payload.
|
|
uint64_t payload = 0, useful_payload = 0, entry_size = 0;
|
|
|
|
// Local variables used repetitively inside the for-loop
|
|
// when iterating over the sampled entries.
|
|
Slice key_slice, value_slice;
|
|
ParsedInternalKey res;
|
|
SnapshotImpl min_snapshot;
|
|
std::string vget;
|
|
Status mget_s, parse_s;
|
|
MergeContext merge_context;
|
|
SequenceNumber max_covering_tombstone_seq = 0, sqno = 0,
|
|
min_seqno_snapshot = 0;
|
|
bool get_res, can_be_useful_payload, not_in_next_mems;
|
|
|
|
// If estimated_useful_payload is > threshold,
|
|
// then flush to storage, else MemPurge.
|
|
double estimated_useful_payload = 0.0;
|
|
// Cochran formula for determining sample size.
|
|
// 95% confidence interval, 7% precision.
|
|
// n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
|
|
double n0 = 196.0;
|
|
ReadOptions ro;
|
|
ro.total_order_seek = true;
|
|
|
|
// Iterate over each memtable of the set.
|
|
for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
|
|
mem_iter++) {
|
|
MemTable* mt = *mem_iter;
|
|
|
|
// Else sample from the table.
|
|
uint64_t nentries = mt->num_entries();
|
|
// Corrected Cochran formula for small populations
|
|
// (converges to n0 for large populations).
|
|
uint64_t target_sample_size =
|
|
static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
|
|
std::unordered_set<const char*> sentries = {};
|
|
// Populate sample entries set.
|
|
mt->UniqueRandomSample(target_sample_size, &sentries);
|
|
|
|
// Estimate the garbage ratio by comparing if
|
|
// each sample corresponds to a valid entry.
|
|
for (const char* ss : sentries) {
|
|
key_slice = GetLengthPrefixedSlice(ss);
|
|
parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/);
|
|
if (!parse_s.ok()) {
|
|
ROCKS_LOG_WARN(db_options_.info_log,
|
|
"Memtable Decider: ParseInternalKey did not parse "
|
|
"key_slice %s successfully.",
|
|
key_slice.data());
|
|
}
|
|
|
|
// Size of the entry is "key size (+ value size if KV entry)"
|
|
entry_size = key_slice.size();
|
|
if (res.type == kTypeValue) {
|
|
value_slice =
|
|
GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
|
|
entry_size += value_slice.size();
|
|
}
|
|
|
|
// Count entry bytes as payload.
|
|
payload += entry_size;
|
|
|
|
LookupKey lkey(res.user_key, kMaxSequenceNumber);
|
|
|
|
// Paranoia: zero out these values just in case.
|
|
max_covering_tombstone_seq = 0;
|
|
sqno = 0;
|
|
|
|
// Pick the oldest existing snapshot that is more recent
|
|
// than the sequence number of the sampled entry.
|
|
min_seqno_snapshot = kMaxSequenceNumber;
|
|
for (SequenceNumber seq_num : existing_snapshots_) {
|
|
if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
|
|
min_seqno_snapshot = seq_num;
|
|
}
|
|
}
|
|
min_snapshot.number_ = min_seqno_snapshot;
|
|
ro.snapshot =
|
|
min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
|
|
|
|
// Estimate if the sample entry is valid or not.
|
|
get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
|
|
&max_covering_tombstone_seq, &sqno, ro);
|
|
if (!get_res) {
|
|
ROCKS_LOG_WARN(
|
|
db_options_.info_log,
|
|
"Memtable Get returned false when Get(sampled entry). "
|
|
"Yet each sample entry should exist somewhere in the memtable, "
|
|
"unrelated to whether it has been deleted or not.");
|
|
}
|
|
|
|
// TODO(bjlemaire): evaluate typeMerge.
|
|
// This is where the sampled entry is estimated to be
|
|
// garbage or not. Note that this is a garbage *estimation*
|
|
// because we do not include certain items such as
|
|
// CompactionFitlers triggered at flush, or if the same delete
|
|
// has been inserted twice or more in the memtable.
|
|
|
|
// Evaluate if the entry can be useful payload
|
|
// Situation #1: entry is a KV entry, was found in the memtable mt
|
|
// and the sequence numbers match.
|
|
can_be_useful_payload = (res.type == kTypeValue) && get_res &&
|
|
mget_s.ok() && (sqno == res.sequence);
|
|
|
|
// Situation #2: entry is a delete entry, was found in the memtable mt
|
|
// (because gres==true) and no valid KV entry is found.
|
|
// (note: duplicate delete entries are also taken into
|
|
// account here, because the sequence number 'sqno'
|
|
// in memtable->Get(&sqno) operation is set to be equal
|
|
// to the most recent delete entry as well).
|
|
can_be_useful_payload |=
|
|
((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) &&
|
|
mget_s.IsNotFound() && get_res && (sqno == res.sequence);
|
|
|
|
// If there is a chance that the entry is useful payload
|
|
// Verify that the entry does not appear in the following memtables
|
|
// (memtables with greater memtable ID/larger sequence numbers).
|
|
if (can_be_useful_payload) {
|
|
not_in_next_mems = true;
|
|
for (auto next_mem_iter = mem_iter + 1;
|
|
next_mem_iter != std::end(mems_); next_mem_iter++) {
|
|
if ((*next_mem_iter)
|
|
->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
|
|
&max_covering_tombstone_seq, &sqno, ro)) {
|
|
not_in_next_mems = false;
|
|
break;
|
|
}
|
|
}
|
|
if (not_in_next_mems) {
|
|
useful_payload += entry_size;
|
|
}
|
|
}
|
|
}
|
|
if (payload > 0) {
|
|
// We use the estimated useful payload ratio to
|
|
// evaluate how many of the memtable bytes are useful bytes.
|
|
estimated_useful_payload +=
|
|
(mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
|
|
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"Mempurge sampling [CF %s] - found garbage ratio from "
|
|
"sampling: %f. Threshold is %f\n",
|
|
cfd_->GetName().c_str(),
|
|
(payload - useful_payload) * 1.0 / payload, threshold);
|
|
} else {
|
|
ROCKS_LOG_WARN(db_options_.info_log,
|
|
"Mempurge sampling: null payload measured, and collected "
|
|
"sample size is %zu\n.",
|
|
sentries.size());
|
|
}
|
|
}
|
|
// We convert the total number of useful payload bytes
|
|
// into the proportion of memtable necessary to store all these bytes.
|
|
// We compare this proportion with the threshold value.
|
|
return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
|
|
threshold);
|
|
}
|
|
|
|
Status FlushJob::WriteLevel0Table() {
|
|
AutoThreadOperationStageUpdater stage_updater(
|
|
ThreadStatus::STAGE_FLUSH_WRITE_L0);
|
|
db_mutex_->AssertHeld();
|
|
const uint64_t start_micros = clock_->NowMicros();
|
|
const uint64_t start_cpu_micros = clock_->CPUMicros();
|
|
Status s;
|
|
|
|
std::vector<BlobFileAddition> blob_file_additions;
|
|
|
|
{
|
|
auto write_hint = cfd_->CalculateSSTWriteHint(0);
|
|
Env::IOPriority io_priority = GetRateLimiterPriorityForWrite();
|
|
db_mutex_->Unlock();
|
|
if (log_buffer_) {
|
|
log_buffer_->FlushBufferToLog();
|
|
}
|
|
// memtables and range_del_iters store internal iterators over each data
|
|
// memtable and its associated range deletion memtable, respectively, at
|
|
// corresponding indexes.
|
|
std::vector<InternalIterator*> memtables;
|
|
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
|
range_del_iters;
|
|
ReadOptions ro;
|
|
ro.total_order_seek = true;
|
|
Arena arena;
|
|
uint64_t total_num_entries = 0, total_num_deletes = 0;
|
|
uint64_t total_data_size = 0;
|
|
size_t total_memory_usage = 0;
|
|
// Used for testing:
|
|
uint64_t mems_size = mems_.size();
|
|
(void)mems_size; // avoids unused variable error when
|
|
// TEST_SYNC_POINT_CALLBACK not used.
|
|
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables",
|
|
&mems_size);
|
|
assert(job_context_);
|
|
for (MemTable* m : mems_) {
|
|
ROCKS_LOG_INFO(
|
|
db_options_.info_log,
|
|
"[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
|
|
cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
|
|
memtables.push_back(m->NewIterator(ro, &arena));
|
|
auto* range_del_iter =
|
|
m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
|
|
if (range_del_iter != nullptr) {
|
|
range_del_iters.emplace_back(range_del_iter);
|
|
}
|
|
total_num_entries += m->num_entries();
|
|
total_num_deletes += m->num_deletes();
|
|
total_data_size += m->get_data_size();
|
|
total_memory_usage += m->ApproximateMemoryUsage();
|
|
}
|
|
|
|
event_logger_->Log() << "job" << job_context_->job_id << "event"
|
|
<< "flush_started"
|
|
<< "num_memtables" << mems_.size() << "num_entries"
|
|
<< total_num_entries << "num_deletes"
|
|
<< total_num_deletes << "total_data_size"
|
|
<< total_data_size << "memory_usage"
|
|
<< total_memory_usage << "flush_reason"
|
|
<< GetFlushReasonString(cfd_->GetFlushReason());
|
|
|
|
{
|
|
ScopedArenaIterator iter(
|
|
NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
|
|
static_cast<int>(memtables.size()), &arena));
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
|
|
cfd_->GetName().c_str(), job_context_->job_id,
|
|
meta_.fd.GetNumber());
|
|
|
|
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
|
|
&output_compression_);
|
|
int64_t _current_time = 0;
|
|
auto status = clock_->GetCurrentTime(&_current_time);
|
|
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
|
|
if (!status.ok()) {
|
|
ROCKS_LOG_WARN(
|
|
db_options_.info_log,
|
|
"Failed to get current time to populate creation_time property. "
|
|
"Status: %s",
|
|
status.ToString().c_str());
|
|
}
|
|
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
|
|
|
uint64_t oldest_key_time =
|
|
mems_.front()->ApproximateOldestKeyTime();
|
|
|
|
// It's not clear whether oldest_key_time is always available. In case
|
|
// it is not available, use current_time.
|
|
uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time);
|
|
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"FlushJob::WriteLevel0Table:oldest_ancester_time",
|
|
&oldest_ancester_time);
|
|
meta_.oldest_ancester_time = oldest_ancester_time;
|
|
|
|
meta_.file_creation_time = current_time;
|
|
|
|
uint64_t creation_time = (cfd_->ioptions()->compaction_style ==
|
|
CompactionStyle::kCompactionStyleFIFO)
|
|
? current_time
|
|
: meta_.oldest_ancester_time;
|
|
|
|
uint64_t num_input_entries = 0;
|
|
uint64_t memtable_payload_bytes = 0;
|
|
uint64_t memtable_garbage_bytes = 0;
|
|
IOStatus io_s;
|
|
const std::string* const full_history_ts_low =
|
|
(full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_;
|
|
TableBuilderOptions tboptions(
|
|
*cfd_->ioptions(), mutable_cf_options_, cfd_->internal_comparator(),
|
|
cfd_->int_tbl_prop_collector_factories(), output_compression_,
|
|
mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(),
|
|
0 /* level */, false /* is_bottommost */,
|
|
TableFileCreationReason::kFlush, creation_time, oldest_key_time,
|
|
current_time, db_id_, db_session_id_, 0 /* target_file_size */,
|
|
meta_.fd.GetNumber());
|
|
const SequenceNumber job_snapshot_seq =
|
|
job_context_->GetJobSnapshotSequence();
|
|
s = BuildTable(
|
|
dbname_, versions_, db_options_, tboptions, file_options_,
|
|
cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
|
|
&blob_file_additions, existing_snapshots_,
|
|
earliest_write_conflict_snapshot_, job_snapshot_seq,
|
|
snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
|
|
cfd_->internal_stats(), &io_s, io_tracer_,
|
|
BlobFileCreationReason::kFlush, event_logger_, job_context_->job_id,
|
|
io_priority, &table_properties_, write_hint, full_history_ts_low,
|
|
blob_callback_, &num_input_entries, &memtable_payload_bytes,
|
|
&memtable_garbage_bytes);
|
|
// TODO: Cleanup io_status in BuildTable and table builders
|
|
assert(!s.ok() || io_s.ok());
|
|
io_s.PermitUncheckedError();
|
|
if (num_input_entries != total_num_entries && s.ok()) {
|
|
std::string msg = "Expected " + std::to_string(total_num_entries) +
|
|
" entries in memtables, but read " +
|
|
std::to_string(num_input_entries);
|
|
ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s",
|
|
cfd_->GetName().c_str(), job_context_->job_id,
|
|
msg.c_str());
|
|
if (db_options_.flush_verify_memtable_count) {
|
|
s = Status::Corruption(msg);
|
|
}
|
|
}
|
|
if (tboptions.reason == TableFileCreationReason::kFlush) {
|
|
TEST_SYNC_POINT("DBImpl::FlushJob:Flush");
|
|
RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH,
|
|
memtable_payload_bytes);
|
|
RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
|
|
memtable_garbage_bytes);
|
|
}
|
|
LogFlush(db_options_.info_log);
|
|
}
|
|
ROCKS_LOG_BUFFER(log_buffer_,
|
|
"[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
|
|
" bytes %s"
|
|
"%s",
|
|
cfd_->GetName().c_str(), job_context_->job_id,
|
|
meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
|
|
s.ToString().c_str(),
|
|
meta_.marked_for_compaction ? " (needs compaction)" : "");
|
|
|
|
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
|
|
s = output_file_directory_->FsyncWithDirOptions(
|
|
IOOptions(), nullptr,
|
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
|
|
}
|
|
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
|
|
db_mutex_->Lock();
|
|
}
|
|
base_->Unref();
|
|
|
|
// Note that if file_size is zero, the file has been deleted and
|
|
// should not be added to the manifest.
|
|
const bool has_output = meta_.fd.GetFileSize() > 0;
|
|
|
|
if (s.ok() && has_output) {
|
|
TEST_SYNC_POINT("DBImpl::FlushJob:SSTFileCreated");
|
|
// if we have more than 1 background thread, then we cannot
|
|
// insert files directly into higher levels because some other
|
|
// threads could be concurrently producing compacted files for
|
|
// that key range.
|
|
// Add file to L0
|
|
edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
|
|
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
|
|
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
|
|
meta_.marked_for_compaction, meta_.temperature,
|
|
meta_.oldest_blob_file_number, meta_.oldest_ancester_time,
|
|
meta_.file_creation_time, meta_.file_checksum,
|
|
meta_.file_checksum_func_name, meta_.min_timestamp,
|
|
meta_.max_timestamp, meta_.unique_id);
|
|
|
|
edit_->SetBlobFileAdditions(std::move(blob_file_additions));
|
|
}
|
|
#ifndef ROCKSDB_LITE
|
|
// Piggyback FlushJobInfo on the first first flushed memtable.
|
|
mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
// Note that here we treat flush as level 0 compaction in internal stats
|
|
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
|
|
const uint64_t micros = clock_->NowMicros() - start_micros;
|
|
const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros;
|
|
stats.micros = micros;
|
|
stats.cpu_micros = cpu_micros;
|
|
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"[%s] [JOB %d] Flush lasted %" PRIu64
|
|
" microseconds, and %" PRIu64 " cpu microseconds.\n",
|
|
cfd_->GetName().c_str(), job_context_->job_id, micros,
|
|
cpu_micros);
|
|
|
|
if (has_output) {
|
|
stats.bytes_written = meta_.fd.GetFileSize();
|
|
stats.num_output_files = 1;
|
|
}
|
|
|
|
const auto& blobs = edit_->GetBlobFileAdditions();
|
|
for (const auto& blob : blobs) {
|
|
stats.bytes_written_blob += blob.GetTotalBlobBytes();
|
|
}
|
|
|
|
stats.num_output_files_blob = static_cast<int>(blobs.size());
|
|
|
|
RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
|
|
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
|
|
cfd_->internal_stats()->AddCFStats(
|
|
InternalStats::BYTES_FLUSHED,
|
|
stats.bytes_written + stats.bytes_written_blob);
|
|
RecordFlushIOStats();
|
|
|
|
return s;
|
|
}
|
|
|
|
Env::IOPriority FlushJob::GetRateLimiterPriorityForWrite() {
|
|
if (versions_ && versions_->GetColumnFamilySet() &&
|
|
versions_->GetColumnFamilySet()->write_controller()) {
|
|
WriteController* write_controller =
|
|
versions_->GetColumnFamilySet()->write_controller();
|
|
if (write_controller->IsStopped() || write_controller->NeedsDelay()) {
|
|
return Env::IO_USER;
|
|
}
|
|
}
|
|
|
|
return Env::IO_HIGH;
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
|
|
db_mutex_->AssertHeld();
|
|
std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
|
|
info->cf_id = cfd_->GetID();
|
|
info->cf_name = cfd_->GetName();
|
|
|
|
const uint64_t file_number = meta_.fd.GetNumber();
|
|
info->file_path =
|
|
MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
|
|
info->file_number = file_number;
|
|
info->oldest_blob_file_number = meta_.oldest_blob_file_number;
|
|
info->thread_id = db_options_.env->GetThreadID();
|
|
info->job_id = job_context_->job_id;
|
|
info->smallest_seqno = meta_.fd.smallest_seqno;
|
|
info->largest_seqno = meta_.fd.largest_seqno;
|
|
info->table_properties = table_properties_;
|
|
info->flush_reason = cfd_->GetFlushReason();
|
|
info->blob_compression_type = mutable_cf_options_.blob_compression_type;
|
|
|
|
// Update BlobFilesInfo.
|
|
for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
|
|
BlobFileAdditionInfo blob_file_addition_info(
|
|
BlobFileName(cfd_->ioptions()->cf_paths.front().path,
|
|
blob_file.GetBlobFileNumber()) /*blob_file_path*/,
|
|
blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
|
|
blob_file.GetTotalBlobBytes());
|
|
info->blob_file_addition_infos.emplace_back(
|
|
std::move(blob_file_addition_info));
|
|
}
|
|
return info;
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|