rocksdb/db/builder.cc
Peter Dillinger 02443dd93f Refactor, clean up, fixes, and more testing for SeqnoToTimeMapping (#11905)
Summary:
This change is before a planned DBImpl change to ensure all sufficiently recent sequence numbers since Open are covered by SeqnoToTimeMapping (bug fix with existing test work-arounds). **Intended follow-up**

However, I found enough issues with SeqnoToTimeMapping to warrant this PR first, including very small fixes in DB implementation related to API contract of SeqnoToTimeMapping.

Functional fixes / changes:
* This fixes some mishandling of boundary cases. For example, if the user decides to stop writing to DB, the last written sequence number would perpetually have its write time updated to "now" and would always be ineligible for migration to cold tier. Part of the problem is that the SeqnoToTimeMapping would return a seqno known to have been written before (immediately or otherwise) the requested time, but compaction_job.cc would include that seqno in the preserve/exclude set. That is fixed (in part) by adding one in compaction_job.cc
* That problem was worse because a whole range of seqnos could be updated perpetually with new times in SeqnoToTimeMapping::Append (if no writes to DB). That logic was apparently optimized for GetOldestApproximateTime (now GetProximalTimeBeforeSeqno), which is not used in production, to the detriment of GetOldestSequenceNum (now GetProximalSeqnoBeforeTime), which is used in production. (Perhaps plans changed during development?) This is fixed in Append to optimize for accuracy of GetProximalSeqnoBeforeTime. (Unit tests added and updated.)
* Related: SeqnoToTimeMapping did not have a clear contract about the relationships between seqnos and times, just the idea of a rough correspondence. Now the class description makes it clear that the write time of each recorded seqno comes before or at the associated time, to support getting best results for GetProximalSeqnoBeforeTime. And this makes it easier to make clear the contract of each API function.
  * Update `DBImpl::RecordSeqnoToTimeMapping()` to follow this ordering in gathering samples.

Some part of these changes has required an expanded test work-around for the problem (see intended follow-up above) that the DB does not immediately ensure recent seqnos are covered by its mapping. These work-arounds will be removed with that planned work.

An apparent compaction bug is revealed in
PrecludeLastLevelTest::RangeDelsCauseFileEndpointsToOverlap, so that test is disabled. Filed GitHub issue #11909

Cosmetic / code safety things (not exhaustive):
* Fix some confusing names.
  * `seqno_time_mapping` was used inconsistently in places. Now just `seqno_to_time_mapping` to correspond to class name.
  * Rename confusing `GetOldestSequenceNum` -> `GetProximalSeqnoBeforeTime` and `GetOldestApproximateTime` -> `GetProximalTimeBeforeSeqno`. Part of the motivation is that our times and seqnos here have the same underlying type, so we want to be clear about which is expected where to avoid mixing.
  * Rename `kUnknownSeqnoTime` to `kUnknownTimeBeforeAll` because the value is a bad choice for unknown if we ever add ProximalAfterBlah functions.
  * Arithmetic on SeqnoTimePair doesn't make sense except for delta encoding, so use better names / APIs with that in mind.
  * (OMG) Don't allow direct comparison between SeqnoTimePair and SequenceNumber. (There is no checking that it isn't compared against time by accident.)
  * A field name essentially matching the containing class name is a confusing pattern (`seqno_time_mapping_`).
  * Wrap calls to confusing (but useful) upper_bound and lower_bound functions to have clearer names and more code reuse.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11905

Test Plan: GetOldestSequenceNum (now GetProximalSeqnoBeforeTime) and TruncateOldEntries were lacking unit tests, despite both being used in production (experimental feature). Added those and expanded others.

Reviewed By: jowlyzhang

Differential Revision: D49755592

Pulled By: pdillinger

fbshipit-source-id: f72a3baac74d24b963c77e538bba89a7fc8dce51
2023-09-29 11:21:59 -07:00

473 lines
19 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/builder.h"
#include <algorithm>
#include <deque>
#include <vector>
#include "db/blob/blob_file_builder.h"
#include "db/compaction/compaction_iterator.h"
#include "db/dbformat.h"
#include "db/event_helpers.h"
#include "db/internal_stats.h"
#include "db/merge_helper.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "file/file_util.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/writable_file_writer.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/thread_status_util.h"
#include "options/options_helper.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "table/block_based/block_based_table_builder.h"
#include "table/format.h"
#include "table/internal_iterator.h"
#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
class TableFactory;
TableBuilder* NewTableBuilder(const TableBuilderOptions& tboptions,
WritableFileWriter* file) {
assert((tboptions.column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
tboptions.column_family_name.empty());
return tboptions.ioptions.table_factory->NewTableBuilder(tboptions, file);
}
Status BuildTable(
const std::string& dbname, VersionSet* versions,
const ImmutableDBOptions& db_options, const TableBuilderOptions& tboptions,
const FileOptions& file_options, const ReadOptions& read_options,
TableCache* table_cache, InternalIterator* iter,
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters,
FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions,
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker,
bool paranoid_file_checks, InternalStats* internal_stats,
IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer,
BlobFileCreationReason blob_creation_reason,
const SeqnoToTimeMapping& seqno_to_time_mapping, EventLogger* event_logger,
int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, Env::WriteLifeTimeHint write_hint,
const std::string* full_history_ts_low,
BlobFileCompletionCallback* blob_callback, Version* version,
uint64_t* num_input_entries, uint64_t* memtable_payload_bytes,
uint64_t* memtable_garbage_bytes) {
assert((tboptions.column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
tboptions.column_family_name.empty());
auto& mutable_cf_options = tboptions.moptions;
auto& ioptions = tboptions.ioptions;
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
OutputValidator output_validator(
tboptions.internal_comparator,
/*enable_order_check=*/
mutable_cf_options.check_flush_compaction_key_order,
/*enable_hash=*/paranoid_file_checks);
Status s;
meta->fd.file_size = 0;
iter->SeekToFirst();
std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
new CompactionRangeDelAggregator(&tboptions.internal_comparator,
snapshots, full_history_ts_low));
uint64_t num_unfragmented_tombstones = 0;
uint64_t total_tombstone_payload_bytes = 0;
for (auto& range_del_iter : range_del_iters) {
num_unfragmented_tombstones +=
range_del_iter->num_unfragmented_tombstones();
total_tombstone_payload_bytes +=
range_del_iter->total_tombstone_payload_bytes();
range_del_agg->AddTombstones(std::move(range_del_iter));
}
std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
std::vector<std::string> blob_file_paths;
std::string file_checksum = kUnknownFileChecksum;
std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
EventHelpers::NotifyTableFileCreationStarted(ioptions.listeners, dbname,
tboptions.column_family_name,
fname, job_id, tboptions.reason);
Env* env = db_options.env;
assert(env);
FileSystem* fs = db_options.fs.get();
assert(fs);
TableProperties tp;
bool table_file_created = false;
if (iter->Valid() || !range_del_agg->IsEmpty()) {
std::unique_ptr<CompactionFilter> compaction_filter;
if (ioptions.compaction_filter_factory != nullptr &&
ioptions.compaction_filter_factory->ShouldFilterTableFileCreation(
tboptions.reason)) {
CompactionFilter::Context context;
context.is_full_compaction = false;
context.is_manual_compaction = false;
context.column_family_id = tboptions.column_family_id;
context.reason = tboptions.reason;
compaction_filter =
ioptions.compaction_filter_factory->CreateCompactionFilter(context);
if (compaction_filter != nullptr &&
!compaction_filter->IgnoreSnapshots()) {
s.PermitUncheckedError();
return Status::NotSupported(
"CompactionFilter::IgnoreSnapshots() = false is not supported "
"anymore.");
}
}
TableBuilder* builder;
std::unique_ptr<WritableFileWriter> file_writer;
{
std::unique_ptr<FSWritableFile> file;
#ifndef NDEBUG
bool use_direct_writes = file_options.use_direct_writes;
TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
#endif // !NDEBUG
IOStatus io_s = NewWritableFile(fs, fname, &file, file_options);
assert(s.ok());
s = io_s;
if (io_status->ok()) {
*io_status = io_s;
}
if (!s.ok()) {
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname,
tboptions.column_family_name, fname, job_id, meta->fd,
kInvalidBlobFileNumber, tp, tboptions.reason, s, file_checksum,
file_checksum_func_name);
return s;
}
table_file_created = true;
FileTypeSet tmp_set = ioptions.checksum_handoff_file_types;
file->SetIOPriority(io_priority);
file->SetWriteLifeTimeHint(write_hint);
file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options, ioptions.clock, io_tracer,
ioptions.stats, ioptions.listeners,
ioptions.file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kTableFile), false));
builder = NewTableBuilder(tboptions, file_writer.get());
}
auto ucmp = tboptions.internal_comparator.user_comparator();
MergeHelper merge(
env, ucmp, ioptions.merge_operator.get(), compaction_filter.get(),
ioptions.logger, true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back(), snapshot_checker);
std::unique_ptr<BlobFileBuilder> blob_file_builder(
(mutable_cf_options.enable_blob_files &&
tboptions.level_at_creation >=
mutable_cf_options.blob_file_starting_level &&
blob_file_additions)
? new BlobFileBuilder(
versions, fs, &ioptions, &mutable_cf_options, &file_options,
tboptions.db_id, tboptions.db_session_id, job_id,
tboptions.column_family_id, tboptions.column_family_name,
io_priority, write_hint, io_tracer, blob_callback,
blob_creation_reason, &blob_file_paths, blob_file_additions)
: nullptr);
const std::atomic<bool> kManualCompactionCanceledFalse{false};
CompactionIterator c_iter(
iter, ucmp, &merge, kMaxSequenceNumber, &snapshots,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
ShouldReportDetailedTime(env, ioptions.stats),
true /* internal key corruption is not ok */, range_del_agg.get(),
blob_file_builder.get(), ioptions.allow_data_in_errors,
ioptions.enforce_single_del_contracts,
/*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
true /* must_count_input_entries */,
/*compaction=*/nullptr, compaction_filter.get(),
/*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low);
const size_t ts_sz = ucmp->timestamp_size();
const bool strip_timestamp =
ts_sz > 0 && !ioptions.persist_user_defined_timestamps;
std::string key_after_flush_buf;
c_iter.SeekToFirst();
for (; c_iter.Valid(); c_iter.Next()) {
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey();
Slice key_after_flush = key;
// If user defined timestamps will be stripped from user key after flush,
// the in memory version of the key act logically the same as one with a
// minimum timestamp. We update the timestamp here so file boundary and
// output validator, block builder all see the effect of the stripping.
if (strip_timestamp) {
key_after_flush_buf.clear();
ReplaceInternalKeyWithMinTimestamp(&key_after_flush_buf, key, ts_sz);
key_after_flush = key_after_flush_buf;
}
// Generate a rolling 64-bit hash of the key and values
// Note :
// Here "key" integrates 'sequence_number'+'kType'+'user key'.
s = output_validator.Add(key_after_flush, value);
if (!s.ok()) {
break;
}
builder->Add(key_after_flush, value);
s = meta->UpdateBoundaries(key_after_flush, value, ikey.sequence,
ikey.type);
if (!s.ok()) {
break;
}
// TODO(noetzli): Update stats after flush, too.
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
if (!s.ok()) {
c_iter.status().PermitUncheckedError();
} else if (!c_iter.status().ok()) {
s = c_iter.status();
}
if (s.ok()) {
auto range_del_it = range_del_agg->NewIterator();
Slice last_tombstone_start_user_key{};
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
auto kv = tombstone.Serialize();
// TODO(yuzhangyu): handle range deletion for UDT in memtables only.
builder->Add(kv.first.Encode(), kv.second);
InternalKey tombstone_end = tombstone.SerializeEndKey();
meta->UpdateBoundariesForRange(kv.first, tombstone_end, tombstone.seq_,
tboptions.internal_comparator);
if (version) {
if (last_tombstone_start_user_key.empty() ||
ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key,
range_del_it->start_key()) < 0) {
SizeApproximationOptions approx_opts;
approx_opts.files_size_error_margin = 0.1;
meta->compensated_range_deletion_size += versions->ApproximateSize(
approx_opts, read_options, version, kv.first.Encode(),
tombstone_end.Encode(), 0 /* start_level */, -1 /* end_level */,
TableReaderCaller::kFlush);
}
last_tombstone_start_user_key = range_del_it->start_key();
}
}
}
TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable");
const bool empty = builder->IsEmpty();
if (num_input_entries != nullptr) {
assert(c_iter.HasNumInputEntryScanned());
*num_input_entries =
c_iter.NumInputEntryScanned() + num_unfragmented_tombstones;
}
if (!s.ok() || empty) {
builder->Abandon();
} else {
std::string seqno_to_time_mapping_str;
seqno_to_time_mapping.Encode(
seqno_to_time_mapping_str, meta->fd.smallest_seqno,
meta->fd.largest_seqno, meta->file_creation_time);
builder->SetSeqnoTimeTableProperties(
seqno_to_time_mapping_str,
ioptions.compaction_style == CompactionStyle::kCompactionStyleFIFO
? meta->file_creation_time
: meta->oldest_ancester_time);
s = builder->Finish();
}
if (io_status->ok()) {
*io_status = builder->io_status();
}
if (s.ok() && !empty) {
uint64_t file_size = builder->FileSize();
meta->fd.file_size = file_size;
meta->tail_size = builder->GetTailSize();
meta->marked_for_compaction = builder->NeedCompact();
meta->user_defined_timestamps_persisted =
ioptions.persist_user_defined_timestamps;
assert(meta->fd.GetFileSize() > 0);
tp = builder
->GetTableProperties(); // refresh now that builder is finished
if (memtable_payload_bytes != nullptr &&
memtable_garbage_bytes != nullptr) {
const CompactionIterationStats& ci_stats = c_iter.iter_stats();
uint64_t total_payload_bytes = ci_stats.total_input_raw_key_bytes +
ci_stats.total_input_raw_value_bytes +
total_tombstone_payload_bytes;
uint64_t total_payload_bytes_written =
(tp.raw_key_size + tp.raw_value_size);
// Prevent underflow, which may still happen at this point
// since we only support inserts, deletes, and deleteRanges.
if (total_payload_bytes_written <= total_payload_bytes) {
*memtable_payload_bytes = total_payload_bytes;
*memtable_garbage_bytes =
total_payload_bytes - total_payload_bytes_written;
} else {
*memtable_payload_bytes = 0;
*memtable_garbage_bytes = 0;
}
}
if (table_properties) {
*table_properties = tp;
}
}
delete builder;
// Finish and check for file errors
TEST_SYNC_POINT("BuildTable:BeforeSyncTable");
if (s.ok() && !empty) {
StopWatch sw(ioptions.clock, ioptions.stats, TABLE_SYNC_MICROS);
*io_status = file_writer->Sync(ioptions.use_fsync);
}
TEST_SYNC_POINT("BuildTable:BeforeCloseTableFile");
if (s.ok() && io_status->ok() && !empty) {
*io_status = file_writer->Close();
}
if (s.ok() && io_status->ok() && !empty) {
// Add the checksum information to file metadata.
meta->file_checksum = file_writer->GetFileChecksum();
meta->file_checksum_func_name = file_writer->GetFileChecksumFuncName();
file_checksum = meta->file_checksum;
file_checksum_func_name = meta->file_checksum_func_name;
// Set unique_id only if db_id and db_session_id exist
if (!tboptions.db_id.empty() && !tboptions.db_session_id.empty()) {
if (!GetSstInternalUniqueId(tboptions.db_id, tboptions.db_session_id,
meta->fd.GetNumber(), &(meta->unique_id))
.ok()) {
// if failed to get unique id, just set it Null
meta->unique_id = kNullUniqueId64x2;
}
}
}
if (s.ok()) {
s = *io_status;
}
// TODO(yuzhangyu): handle the key copy in the blob when ts should be
// stripped.
if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
} else {
blob_file_builder->Abandon(s);
}
blob_file_builder.reset();
}
// TODO Also check the IO status when create the Iterator.
TEST_SYNC_POINT("BuildTable:BeforeOutputValidation");
if (s.ok() && !empty) {
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
// here because this is a special case after we finish the table building.
// No matter whether use_direct_io_for_flush_and_compaction is true,
// the goal is to cache it here for further user reads.
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
read_options, file_options, tboptions.internal_comparator, *meta,
nullptr /* range_del_agg */, mutable_cf_options.prefix_extractor,
nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
TableReaderCaller::kFlush, /*arena=*/nullptr,
/*skip_filter=*/false, tboptions.level_at_creation,
MaxFileSizeForL0MetaPin(mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key*/ nullptr,
/*allow_unprepared_value*/ false,
mutable_cf_options.block_protection_bytes_per_key));
s = it->status();
if (s.ok() && paranoid_file_checks) {
OutputValidator file_validator(tboptions.internal_comparator,
/*enable_order_check=*/true,
/*enable_hash=*/true);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// Generate a rolling 64-bit hash of the key and values
file_validator.Add(it->key(), it->value()).PermitUncheckedError();
}
s = it->status();
if (s.ok() && !output_validator.CompareValidator(file_validator)) {
s = Status::Corruption("Paranoid checksums do not match");
}
}
}
}
// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}
if (!s.ok() || meta->fd.GetFileSize() == 0) {
TEST_SYNC_POINT("BuildTable:BeforeDeleteFile");
constexpr IODebugContext* dbg = nullptr;
if (table_file_created) {
Status ignored = fs->DeleteFile(fname, IOOptions(), dbg);
ignored.PermitUncheckedError();
}
assert(blob_file_additions || blob_file_paths.empty());
if (blob_file_additions) {
for (const std::string& blob_file_path : blob_file_paths) {
Status ignored = DeleteDBFile(&db_options, blob_file_path, dbname,
/*force_bg=*/false, /*force_fg=*/false);
ignored.PermitUncheckedError();
TEST_SYNC_POINT("BuildTable::AfterDeleteFile");
}
}
}
Status status_for_listener = s;
if (meta->fd.GetFileSize() == 0) {
fname = "(nil)";
if (s.ok()) {
status_for_listener = Status::Aborted("Empty SST file not kept");
}
}
// Output to event logger and fire events.
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger, ioptions.listeners, dbname, tboptions.column_family_name,
fname, job_id, meta->fd, meta->oldest_blob_file_number, tp,
tboptions.reason, status_for_listener, file_checksum,
file_checksum_func_name);
return s;
}
} // namespace ROCKSDB_NAMESPACE