rocksdb/db/compaction/subcompaction_state.h
Zichen Zhu 8860fc902a Support subcmpct using reserved resources for round-robin priority (#10341)
Summary:
Earlier implementation of round-robin priority can only pick one file at a time and disallows parallel compactions within the same level. In this PR, round-robin compaction policy will expand towards more input files with respecting some additional constraints, which are summarized as follows:
 * Constraint 1: We can only pick consecutive files
   - Constraint 1a: When a file is being compacted (or some input files are being compacted after expanding), we cannot choose it and have to stop choosing more files
   - Constraint 1b: When we reach the last file (with the largest keys), we cannot choose more files (the next file will be the first one with small keys)
 * Constraint 2: We should ensure the total compaction bytes (including the overlapped files from the next level) is no more than `mutable_cf_options_.max_compaction_bytes`
 * Constraint 3: We try our best to pick as many files as possible so that the post-compaction level size can be just less than `MaxBytesForLevel(start_level_)`
 * Constraint 4: If trivial move is allowed, we reuse the logic of `TryNonL0TrivialMove()` instead of expanding files with Constraint 3

More details can be found in `LevelCompactionBuilder::SetupOtherFilesWithRoundRobinExpansion()`.

The above optimization accelerates the process of moving the compaction cursor, in which the write-amp can be further reduced. While a large compaction may lead to high write stall, we break this large compaction into several subcompactions **regardless of** the `max_subcompactions` limit.  The number of subcompactions for round-robin compaction priority is determined through the following steps:
* Step 1: Initialized against `max_output_file_limit`, the number of input files in the start level, and also the range size limit `ranges.size()`
* Step 2: Call `AcquireSubcompactionResources()`when max subcompactions is not sufficient, but we may or may not obtain desired resources, additional number of resources is stored in `extra_num_subcompaction_threads_reserved_`). Subcompaction limit is changed and update `num_planned_subcompactions` with `GetSubcompactionLimit()`
* Step 3: Call `ShrinkSubcompactionResources()` to ensure extra resources can be released (extra resources may exist for round-robin compaction when the number of actual number of subcompactions is less than the number of planned subcompactions)

More details can be found in `CompactionJob::AcquireSubcompactionResources()`,`CompactionJob::ShrinkSubcompactionResources()`, and `CompactionJob::ReleaseSubcompactionResources()`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10341

Test Plan: Add `CompactionPriMultipleFilesRoundRobin[1-3]` unit test in `compaction_picker_test.cc` and `RoundRobinSubcompactionsAgainstResources.SubcompactionsUsingResources/[0-4]`, `RoundRobinSubcompactionsAgainstPressureToken.PressureTokenTest/[0-1]` in `db_compaction_test.cc`

Reviewed By: ajkr, hx235

Differential Revision: D37792644

Pulled By: littlepig2013

fbshipit-source-id: 7fecb7c4ffd97b34bbf6e3b760b2c35a772a0657
2022-07-24 11:12:44 -07:00

258 lines
10 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <optional>
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_garbage_meter.h"
#include "db/compaction/compaction.h"
#include "db/compaction/compaction_iterator.h"
#include "db/compaction/compaction_outputs.h"
#include "db/internal_stats.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
namespace ROCKSDB_NAMESPACE {
// Maintains state and outputs for each sub-compaction
// It contains 2 `CompactionOutputs`:
// 1. one for the normal output files
// 2. another for the penultimate level outputs
// a `current` pointer maintains the current output group, when calling
// `AddToOutput()`, it checks the output of the current compaction_iterator key
// and point `current` to the target output group. By default, it just points to
// normal compaction_outputs, if the compaction_iterator key should be placed on
// the penultimate level, `current` is changed to point to
// `penultimate_level_outputs`.
// The later operations uses `Current()` to get the target group.
//
// +----------+ +-----------------------------+ +---------+
// | *current |--------> | compaction_outputs |----->| output |
// +----------+ +-----------------------------+ +---------+
// | | output |
// | +---------+
// | | ... |
// |
// | +-----------------------------+ +---------+
// +-------------> | penultimate_level_outputs |----->| output |
// +-----------------------------+ +---------+
// | ... |
class SubcompactionState {
public:
const Compaction* compaction;
// The boundaries of the key-range this compaction is interested in. No two
// sub-compactions may have overlapping key-ranges.
// 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
const std::optional<Slice> start, end;
// The return status of this sub-compaction
Status status;
// The return IO Status of this sub-compaction
IOStatus io_status;
// Notify on sub-compaction completion only if listener was notified on
// sub-compaction begin.
bool notify_on_subcompaction_completion = false;
// compaction job stats for this sub-compaction
CompactionJobStats compaction_job_stats;
// sub-compaction job id, which is used to identify different sub-compaction
// within the same compaction job.
const uint32_t sub_job_id;
Slice SmallestUserKey() const;
Slice LargestUserKey() const;
// Get all outputs from the subcompaction. For per_key_placement compaction,
// it returns both the last level outputs and penultimate level outputs.
OutputIterator GetOutputs() const;
// Assign range dels aggregator, for each range_del, it can only be assigned
// to one output level, for per_key_placement, it's going to be the
// penultimate level.
void AssignRangeDelAggregator(
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
if (compaction->SupportsPerKeyPlacement()) {
penultimate_level_outputs_.AssignRangeDelAggregator(
std::move(range_del_agg));
} else {
compaction_outputs_.AssignRangeDelAggregator(std::move(range_del_agg));
}
}
void RemoveLastEmptyOutput() {
compaction_outputs_.RemoveLastEmptyOutput();
penultimate_level_outputs_.RemoveLastEmptyOutput();
}
#ifndef ROCKSDB_LITE
void BuildSubcompactionJobInfo(
SubcompactionJobInfo& subcompaction_job_info) const {
const Compaction* c = compaction;
const ColumnFamilyData* cfd = c->column_family_data();
subcompaction_job_info.cf_id = cfd->GetID();
subcompaction_job_info.cf_name = cfd->GetName();
subcompaction_job_info.status = status;
subcompaction_job_info.subcompaction_job_id = static_cast<int>(sub_job_id);
subcompaction_job_info.base_input_level = c->start_level();
subcompaction_job_info.output_level = c->output_level();
subcompaction_job_info.stats = compaction_job_stats;
}
#endif // !ROCKSDB_LITE
SubcompactionState() = delete;
SubcompactionState(const SubcompactionState&) = delete;
SubcompactionState& operator=(const SubcompactionState&) = delete;
SubcompactionState(Compaction* c, const std::optional<Slice> _start,
const std::optional<Slice> _end, uint32_t _sub_job_id)
: compaction(c),
start(_start),
end(_end),
sub_job_id(_sub_job_id),
compaction_outputs_(c, /*is_penultimate_level=*/false),
penultimate_level_outputs_(c, /*is_penultimate_level=*/true) {
assert(compaction != nullptr);
const InternalKeyComparator* icmp =
&compaction->column_family_data()->internal_comparator();
const InternalKey* output_split_key = compaction->GetOutputSplitKey();
// Invalid output_split_key indicates that we do not need to split
if (output_split_key != nullptr) {
// We may only split the output when the cursor is in the range. Split
if ((!end.has_value() ||
icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key->Encode()), end.value()) < 0) &&
(!start.has_value() || icmp->user_comparator()->Compare(
ExtractUserKey(output_split_key->Encode()),
start.value()) > 0)) {
local_output_split_key_ = output_split_key;
}
}
}
SubcompactionState(SubcompactionState&& state) noexcept
: compaction(state.compaction),
start(state.start),
end(state.end),
status(std::move(state.status)),
io_status(std::move(state.io_status)),
notify_on_subcompaction_completion(
state.notify_on_subcompaction_completion),
compaction_job_stats(std::move(state.compaction_job_stats)),
sub_job_id(state.sub_job_id),
files_to_cut_for_ttl_(std::move(state.files_to_cut_for_ttl_)),
cur_files_to_cut_for_ttl_(state.cur_files_to_cut_for_ttl_),
next_files_to_cut_for_ttl_(state.next_files_to_cut_for_ttl_),
grandparent_index_(state.grandparent_index_),
overlapped_bytes_(state.overlapped_bytes_),
seen_key_(state.seen_key_),
compaction_outputs_(std::move(state.compaction_outputs_)),
penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)),
is_current_penultimate_level_(state.is_current_penultimate_level_),
has_penultimate_level_outputs_(state.has_penultimate_level_outputs_) {
current_outputs_ = is_current_penultimate_level_
? &penultimate_level_outputs_
: &compaction_outputs_;
}
bool HasPenultimateLevelOutputs() const {
return has_penultimate_level_outputs_ ||
penultimate_level_outputs_.HasRangeDel();
}
void FillFilesToCutForTtl();
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key);
bool IsCurrentPenultimateLevel() const {
return is_current_penultimate_level_;
}
// Add all the new files from this compaction to version_edit
void AddOutputsEdit(VersionEdit* out_edit) const {
for (const auto& file : penultimate_level_outputs_.outputs_) {
out_edit->AddFile(compaction->GetPenultimateLevel(), file.meta);
}
for (const auto& file : compaction_outputs_.outputs_) {
out_edit->AddFile(compaction->output_level(), file.meta);
}
}
void Cleanup(Cache* cache);
void AggregateCompactionStats(
InternalStats::CompactionStatsFull& compaction_stats) const;
CompactionOutputs& Current() const {
assert(current_outputs_);
return *current_outputs_;
}
// Add compaction_iterator key/value to the `Current` output group.
Status AddToOutput(const CompactionIterator& iter,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func);
// Close all compaction output files, both output_to_penultimate_level outputs
// and normal outputs.
Status CloseCompactionFiles(const Status& curr_status,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
// Call FinishCompactionOutputFile() even if status is not ok: it needs to
// close the output file.
Status s = penultimate_level_outputs_.CloseOutput(
curr_status, open_file_func, close_file_func);
s = compaction_outputs_.CloseOutput(s, open_file_func, close_file_func);
return s;
}
private:
// Some identified files with old oldest ancester time and the range should be
// isolated out so that the output file(s) in that range can be merged down
// for TTL and clear the timestamps for the range.
std::vector<FileMetaData*> files_to_cut_for_ttl_;
int cur_files_to_cut_for_ttl_ = -1;
int next_files_to_cut_for_ttl_ = 0;
// An index that used to speed up ShouldStopBefore().
size_t grandparent_index_ = 0;
// The number of bytes overlapping between the current output and
// grandparent files used in ShouldStopBefore().
uint64_t overlapped_bytes_ = 0;
// A flag determines whether the key has been seen in ShouldStopBefore()
bool seen_key_ = false;
// A flag determines if this subcompaction has been split by the cursor
bool is_split_ = false;
// We also maintain the output split key for each subcompaction to avoid
// repetitive comparison in ShouldStopBefore()
const InternalKey* local_output_split_key_ = nullptr;
// State kept for output being generated
CompactionOutputs compaction_outputs_;
CompactionOutputs penultimate_level_outputs_;
CompactionOutputs* current_outputs_ = &compaction_outputs_;
bool is_current_penultimate_level_ = false;
bool has_penultimate_level_outputs_ = false;
};
} // namespace ROCKSDB_NAMESPACE