mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 04:41:49 +00:00
02443dd93f
Summary: This change is before a planned DBImpl change to ensure all sufficiently recent sequence numbers since Open are covered by SeqnoToTimeMapping (bug fix with existing test work-arounds). **Intended follow-up** However, I found enough issues with SeqnoToTimeMapping to warrant this PR first, including very small fixes in DB implementation related to API contract of SeqnoToTimeMapping. Functional fixes / changes: * This fixes some mishandling of boundary cases. For example, if the user decides to stop writing to DB, the last written sequence number would perpetually have its write time updated to "now" and would always be ineligible for migration to cold tier. Part of the problem is that the SeqnoToTimeMapping would return a seqno known to have been written before (immediately or otherwise) the requested time, but compaction_job.cc would include that seqno in the preserve/exclude set. That is fixed (in part) by adding one in compaction_job.cc * That problem was worse because a whole range of seqnos could be updated perpetually with new times in SeqnoToTimeMapping::Append (if no writes to DB). That logic was apparently optimized for GetOldestApproximateTime (now GetProximalTimeBeforeSeqno), which is not used in production, to the detriment of GetOldestSequenceNum (now GetProximalSeqnoBeforeTime), which is used in production. (Perhaps plans changed during development?) This is fixed in Append to optimize for accuracy of GetProximalSeqnoBeforeTime. (Unit tests added and updated.) * Related: SeqnoToTimeMapping did not have a clear contract about the relationships between seqnos and times, just the idea of a rough correspondence. Now the class description makes it clear that the write time of each recorded seqno comes before or at the associated time, to support getting best results for GetProximalSeqnoBeforeTime. And this makes it easier to make clear the contract of each API function. * Update `DBImpl::RecordSeqnoToTimeMapping()` to follow this ordering in gathering samples. Some part of these changes has required an expanded test work-around for the problem (see intended follow-up above) that the DB does not immediately ensure recent seqnos are covered by its mapping. These work-arounds will be removed with that planned work. An apparent compaction bug is revealed in PrecludeLastLevelTest::RangeDelsCauseFileEndpointsToOverlap, so that test is disabled. Filed GitHub issue #11909 Cosmetic / code safety things (not exhaustive): * Fix some confusing names. * `seqno_time_mapping` was used inconsistently in places. Now just `seqno_to_time_mapping` to correspond to class name. * Rename confusing `GetOldestSequenceNum` -> `GetProximalSeqnoBeforeTime` and `GetOldestApproximateTime` -> `GetProximalTimeBeforeSeqno`. Part of the motivation is that our times and seqnos here have the same underlying type, so we want to be clear about which is expected where to avoid mixing. * Rename `kUnknownSeqnoTime` to `kUnknownTimeBeforeAll` because the value is a bad choice for unknown if we ever add ProximalAfterBlah functions. * Arithmetic on SeqnoTimePair doesn't make sense except for delta encoding, so use better names / APIs with that in mind. * (OMG) Don't allow direct comparison between SeqnoTimePair and SequenceNumber. (There is no checking that it isn't compared against time by accident.) * A field name essentially matching the containing class name is a confusing pattern (`seqno_time_mapping_`). * Wrap calls to confusing (but useful) upper_bound and lower_bound functions to have clearer names and more code reuse. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11905 Test Plan: GetOldestSequenceNum (now GetProximalSeqnoBeforeTime) and TruncateOldEntries were lacking unit tests, despite both being used in production (experimental feature). Added those and expanded others. Reviewed By: jowlyzhang Differential Revision: D49755592 Pulled By: pdillinger fbshipit-source-id: f72a3baac74d24b963c77e538bba89a7fc8dce51
413 lines
15 KiB
C++
413 lines
15 KiB
C++
// Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
//
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#pragma once
|
|
|
|
#include "db/blob/blob_garbage_meter.h"
|
|
#include "db/compaction/compaction.h"
|
|
#include "db/compaction/compaction_iterator.h"
|
|
#include "db/internal_stats.h"
|
|
#include "db/output_validator.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class CompactionOutputs;
|
|
using CompactionFileOpenFunc = std::function<Status(CompactionOutputs&)>;
|
|
using CompactionFileCloseFunc =
|
|
std::function<Status(CompactionOutputs&, const Status&, const Slice&)>;
|
|
|
|
// Files produced by subcompaction, most of the functions are used by
|
|
// compaction_job Open/Close compaction file functions.
|
|
class CompactionOutputs {
|
|
public:
|
|
// compaction output file
|
|
struct Output {
|
|
Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
|
|
bool _enable_order_check, bool _enable_hash, bool _finished,
|
|
uint64_t precalculated_hash)
|
|
: meta(std::move(_meta)),
|
|
validator(_icmp, _enable_order_check, _enable_hash,
|
|
precalculated_hash),
|
|
finished(_finished) {}
|
|
FileMetaData meta;
|
|
OutputValidator validator;
|
|
bool finished;
|
|
std::shared_ptr<const TableProperties> table_properties;
|
|
};
|
|
|
|
CompactionOutputs() = delete;
|
|
|
|
explicit CompactionOutputs(const Compaction* compaction,
|
|
const bool is_penultimate_level);
|
|
|
|
// Add generated output to the list
|
|
void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp,
|
|
bool enable_order_check, bool enable_hash,
|
|
bool finished = false, uint64_t precalculated_hash = 0) {
|
|
outputs_.emplace_back(std::move(meta), icmp, enable_order_check,
|
|
enable_hash, finished, precalculated_hash);
|
|
}
|
|
|
|
// Set new table builder for the current output
|
|
void NewBuilder(const TableBuilderOptions& tboptions);
|
|
|
|
// Assign a new WritableFileWriter to the current output
|
|
void AssignFileWriter(WritableFileWriter* writer) {
|
|
file_writer_.reset(writer);
|
|
}
|
|
|
|
// TODO: Remove it when remote compaction support tiered compaction
|
|
void SetTotalBytes(uint64_t bytes) { stats_.bytes_written += bytes; }
|
|
void SetNumOutputRecords(uint64_t num) { stats_.num_output_records = num; }
|
|
|
|
// TODO: Move the BlobDB builder into CompactionOutputs
|
|
const std::vector<BlobFileAddition>& GetBlobFileAdditions() const {
|
|
if (is_penultimate_level_) {
|
|
assert(blob_file_additions_.empty());
|
|
}
|
|
return blob_file_additions_;
|
|
}
|
|
|
|
std::vector<BlobFileAddition>* GetBlobFileAdditionsPtr() {
|
|
assert(!is_penultimate_level_);
|
|
return &blob_file_additions_;
|
|
}
|
|
|
|
bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); }
|
|
|
|
BlobGarbageMeter* CreateBlobGarbageMeter() {
|
|
assert(!is_penultimate_level_);
|
|
blob_garbage_meter_ = std::make_unique<BlobGarbageMeter>();
|
|
return blob_garbage_meter_.get();
|
|
}
|
|
|
|
BlobGarbageMeter* GetBlobGarbageMeter() const {
|
|
if (is_penultimate_level_) {
|
|
// blobdb doesn't support per_key_placement yet
|
|
assert(blob_garbage_meter_ == nullptr);
|
|
return nullptr;
|
|
}
|
|
return blob_garbage_meter_.get();
|
|
}
|
|
|
|
void UpdateBlobStats() {
|
|
assert(!is_penultimate_level_);
|
|
stats_.num_output_files_blob = blob_file_additions_.size();
|
|
for (const auto& blob : blob_file_additions_) {
|
|
stats_.bytes_written_blob += blob.GetTotalBlobBytes();
|
|
}
|
|
}
|
|
|
|
// Finish the current output file
|
|
Status Finish(const Status& intput_status,
|
|
const SeqnoToTimeMapping& seqno_to_time_mapping);
|
|
|
|
// Update output table properties from table builder
|
|
void UpdateTableProperties() {
|
|
current_output().table_properties =
|
|
std::make_shared<TableProperties>(GetTableProperties());
|
|
}
|
|
|
|
IOStatus WriterSyncClose(const Status& intput_status, SystemClock* clock,
|
|
Statistics* statistics, bool use_fsync);
|
|
|
|
TableProperties GetTableProperties() {
|
|
return builder_->GetTableProperties();
|
|
}
|
|
|
|
Slice SmallestUserKey() const {
|
|
if (!outputs_.empty() && outputs_[0].finished) {
|
|
return outputs_[0].meta.smallest.user_key();
|
|
} else {
|
|
return Slice{nullptr, 0};
|
|
}
|
|
}
|
|
|
|
Slice LargestUserKey() const {
|
|
if (!outputs_.empty() && outputs_.back().finished) {
|
|
return outputs_.back().meta.largest.user_key();
|
|
} else {
|
|
return Slice{nullptr, 0};
|
|
}
|
|
}
|
|
|
|
// In case the last output file is empty, which doesn't need to keep.
|
|
void RemoveLastEmptyOutput() {
|
|
if (!outputs_.empty() && !outputs_.back().meta.fd.file_size) {
|
|
// An error occurred, so ignore the last output.
|
|
outputs_.pop_back();
|
|
}
|
|
}
|
|
|
|
// Remove the last output, for example the last output doesn't have data (no
|
|
// entry and no range-dels), but file_size might not be 0, as it has SST
|
|
// metadata.
|
|
void RemoveLastOutput() {
|
|
assert(!outputs_.empty());
|
|
outputs_.pop_back();
|
|
}
|
|
|
|
bool HasBuilder() const { return builder_ != nullptr; }
|
|
|
|
FileMetaData* GetMetaData() { return ¤t_output().meta; }
|
|
|
|
bool HasOutput() const { return !outputs_.empty(); }
|
|
|
|
uint64_t NumEntries() const { return builder_->NumEntries(); }
|
|
|
|
void ResetBuilder() {
|
|
builder_.reset();
|
|
current_output_file_size_ = 0;
|
|
}
|
|
|
|
// Add range deletions from the range_del_agg_ to the current output file.
|
|
// Input parameters, `range_tombstone_lower_bound_` and current output's
|
|
// metadata determine the bounds on range deletions to add. Updates output
|
|
// file metadata boundary if extended by range tombstones.
|
|
//
|
|
// @param comp_start_user_key and comp_end_user_key include timestamp if
|
|
// user-defined timestamp is enabled. Their timestamp should be max timestamp.
|
|
// @param next_table_min_key internal key lower bound for the next compaction
|
|
// output.
|
|
// @param full_history_ts_low used for range tombstone garbage collection.
|
|
Status AddRangeDels(const Slice* comp_start_user_key,
|
|
const Slice* comp_end_user_key,
|
|
CompactionIterationStats& range_del_out_stats,
|
|
bool bottommost_level, const InternalKeyComparator& icmp,
|
|
SequenceNumber earliest_snapshot,
|
|
const Slice& next_table_min_key,
|
|
const std::string& full_history_ts_low);
|
|
|
|
// if the outputs have range delete, range delete is also data
|
|
bool HasRangeDel() const {
|
|
return range_del_agg_ && !range_del_agg_->IsEmpty();
|
|
}
|
|
|
|
private:
|
|
friend class SubcompactionState;
|
|
|
|
void FillFilesToCutForTtl();
|
|
|
|
void SetOutputSlitKey(const std::optional<Slice> start,
|
|
const std::optional<Slice> end) {
|
|
const InternalKeyComparator* icmp =
|
|
&compaction_->column_family_data()->internal_comparator();
|
|
|
|
const InternalKey* output_split_key = compaction_->GetOutputSplitKey();
|
|
// Invalid output_split_key indicates that we do not need to split
|
|
if (output_split_key != nullptr) {
|
|
// We may only split the output when the cursor is in the range. Split
|
|
if ((!end.has_value() ||
|
|
icmp->user_comparator()->Compare(
|
|
ExtractUserKey(output_split_key->Encode()), *end) < 0) &&
|
|
(!start.has_value() ||
|
|
icmp->user_comparator()->Compare(
|
|
ExtractUserKey(output_split_key->Encode()), *start) > 0)) {
|
|
local_output_split_key_ = output_split_key;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Returns true iff we should stop building the current output
|
|
// before processing the current key in compaction iterator.
|
|
bool ShouldStopBefore(const CompactionIterator& c_iter);
|
|
|
|
void Cleanup() {
|
|
if (builder_ != nullptr) {
|
|
// May happen if we get a shutdown call in the middle of compaction
|
|
builder_->Abandon();
|
|
builder_.reset();
|
|
}
|
|
}
|
|
|
|
// Updates states related to file cutting for TTL.
|
|
// Returns a boolean value indicating whether the current
|
|
// compaction output file should be cut before `internal_key`.
|
|
//
|
|
// @param internal_key the current key to be added to output.
|
|
bool UpdateFilesToCutForTTLStates(const Slice& internal_key);
|
|
|
|
// update tracked grandparents information like grandparent index, if it's
|
|
// in the gap between 2 grandparent files, accumulated grandparent files size
|
|
// etc.
|
|
// It returns how many boundaries it crosses by including current key.
|
|
size_t UpdateGrandparentBoundaryInfo(const Slice& internal_key);
|
|
|
|
// helper function to get the overlapped grandparent files size, it's only
|
|
// used for calculating the first key's overlap.
|
|
uint64_t GetCurrentKeyGrandparentOverlappedBytes(
|
|
const Slice& internal_key) const;
|
|
|
|
// Add current key from compaction_iterator to the output file. If needed
|
|
// close and open new compaction output with the functions provided.
|
|
Status AddToOutput(const CompactionIterator& c_iter,
|
|
const CompactionFileOpenFunc& open_file_func,
|
|
const CompactionFileCloseFunc& close_file_func);
|
|
|
|
// Close the current output. `open_file_func` is needed for creating new file
|
|
// for range-dels only output file.
|
|
Status CloseOutput(const Status& curr_status,
|
|
const CompactionFileOpenFunc& open_file_func,
|
|
const CompactionFileCloseFunc& close_file_func) {
|
|
Status status = curr_status;
|
|
// handle subcompaction containing only range deletions
|
|
if (status.ok() && !HasBuilder() && !HasOutput() && HasRangeDel()) {
|
|
status = open_file_func(*this);
|
|
}
|
|
if (HasBuilder()) {
|
|
const Slice empty_key{};
|
|
Status s = close_file_func(*this, status, empty_key);
|
|
if (!s.ok() && status.ok()) {
|
|
status = s;
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
// This subcompaction's output could be empty if compaction was aborted before
|
|
// this subcompaction had a chance to generate any output files. When
|
|
// subcompactions are executed sequentially this is more likely and will be
|
|
// particularly likely for the later subcompactions to be empty. Once they are
|
|
// run in parallel however it should be much rarer.
|
|
// It's caller's responsibility to make sure it's not empty.
|
|
Output& current_output() {
|
|
assert(!outputs_.empty());
|
|
return outputs_.back();
|
|
}
|
|
|
|
// Assign the range_del_agg to the target output level. There's only one
|
|
// range-del-aggregator per compaction outputs, for
|
|
// output_to_penultimate_level compaction it is only assigned to the
|
|
// penultimate level.
|
|
void AssignRangeDelAggregator(
|
|
std::unique_ptr<CompactionRangeDelAggregator>&& range_del_agg) {
|
|
assert(range_del_agg_ == nullptr);
|
|
range_del_agg_ = std::move(range_del_agg);
|
|
}
|
|
|
|
const Compaction* compaction_;
|
|
|
|
// current output builder and writer
|
|
std::unique_ptr<TableBuilder> builder_;
|
|
std::unique_ptr<WritableFileWriter> file_writer_;
|
|
uint64_t current_output_file_size_ = 0;
|
|
|
|
// all the compaction outputs so far
|
|
std::vector<Output> outputs_;
|
|
|
|
// BlobDB info
|
|
std::vector<BlobFileAddition> blob_file_additions_;
|
|
std::unique_ptr<BlobGarbageMeter> blob_garbage_meter_;
|
|
|
|
// Basic compaction output stats for this level's outputs
|
|
InternalStats::CompactionOutputsStats stats_;
|
|
|
|
// indicate if this CompactionOutputs obj for penultimate_level, should always
|
|
// be false if per_key_placement feature is not enabled.
|
|
const bool is_penultimate_level_;
|
|
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_ = nullptr;
|
|
|
|
// partitioner information
|
|
std::string last_key_for_partitioner_;
|
|
std::unique_ptr<SstPartitioner> partitioner_;
|
|
|
|
// A flag determines if this subcompaction has been split by the cursor
|
|
// for RoundRobin compaction
|
|
bool is_split_ = false;
|
|
|
|
// We also maintain the output split key for each subcompaction to avoid
|
|
// repetitive comparison in ShouldStopBefore()
|
|
const InternalKey* local_output_split_key_ = nullptr;
|
|
|
|
// Some identified files with old oldest ancester time and the range should be
|
|
// isolated out so that the output file(s) in that range can be merged down
|
|
// for TTL and clear the timestamps for the range.
|
|
std::vector<FileMetaData*> files_to_cut_for_ttl_;
|
|
int cur_files_to_cut_for_ttl_ = -1;
|
|
int next_files_to_cut_for_ttl_ = 0;
|
|
|
|
// An index that used to speed up ShouldStopBefore().
|
|
size_t grandparent_index_ = 0;
|
|
|
|
// if the output key is being grandparent files gap, so:
|
|
// key > grandparents[grandparent_index_ - 1].largest &&
|
|
// key < grandparents[grandparent_index_].smallest
|
|
bool being_grandparent_gap_ = true;
|
|
|
|
// The number of bytes overlapping between the current output and
|
|
// grandparent files used in ShouldStopBefore().
|
|
uint64_t grandparent_overlapped_bytes_ = 0;
|
|
|
|
// A flag determines whether the key has been seen in ShouldStopBefore()
|
|
bool seen_key_ = false;
|
|
|
|
// for the current output file, how many file boundaries has it crossed,
|
|
// basically number of files overlapped * 2
|
|
size_t grandparent_boundary_switched_num_ = 0;
|
|
|
|
// The smallest key of the current output file, this is set when current
|
|
// output file's smallest key is a range tombstone start key.
|
|
InternalKey range_tombstone_lower_bound_;
|
|
|
|
// Used for calls to compaction->KeyRangeNotExistsBeyondOutputLevel() in
|
|
// CompactionOutputs::AddRangeDels().
|
|
// level_ptrs_[i] holds index of the file that was checked during the last
|
|
// call to compaction->KeyRangeNotExistsBeyondOutputLevel(). This allows
|
|
// future calls to the function to pick up where it left off, since each
|
|
// range tombstone added to output file within each subcompaction is in
|
|
// increasing key range.
|
|
std::vector<size_t> level_ptrs_;
|
|
};
|
|
|
|
// helper struct to concatenate the last level and penultimate level outputs
|
|
// which could be replaced by std::ranges::join_view() in c++20
|
|
struct OutputIterator {
|
|
public:
|
|
explicit OutputIterator(const std::vector<CompactionOutputs::Output>& a,
|
|
const std::vector<CompactionOutputs::Output>& b)
|
|
: a_(a), b_(b) {
|
|
within_a = !a_.empty();
|
|
idx_ = 0;
|
|
}
|
|
|
|
OutputIterator begin() { return *this; }
|
|
|
|
OutputIterator end() { return *this; }
|
|
|
|
size_t size() { return a_.size() + b_.size(); }
|
|
|
|
const CompactionOutputs::Output& operator*() const {
|
|
return within_a ? a_[idx_] : b_[idx_];
|
|
}
|
|
|
|
OutputIterator& operator++() {
|
|
idx_++;
|
|
if (within_a && idx_ >= a_.size()) {
|
|
within_a = false;
|
|
idx_ = 0;
|
|
}
|
|
assert(within_a || idx_ <= b_.size());
|
|
return *this;
|
|
}
|
|
|
|
bool operator!=(const OutputIterator& /*rhs*/) const {
|
|
return within_a || idx_ < b_.size();
|
|
}
|
|
|
|
private:
|
|
const std::vector<CompactionOutputs::Output>& a_;
|
|
const std::vector<CompactionOutputs::Output>& b_;
|
|
bool within_a;
|
|
size_t idx_;
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|