mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-26 16:30:56 +00:00
cb08a682d4
Summary: The SeqnoToTimeMapping class (RocksDB internal) used by the preserve_internal_time_seconds / preclude_last_level_data_seconds options was essentially in a prototype state with some significant flaws that would risk biting us some day. This is a big, complicated change because both the implementation and the behavioral requirements of the class needed to be upgraded together. In short, this makes SeqnoToTimeMapping more internally responsible for maintaining good invariants, so that callers don't easily encounter dangerous scenarios. * Some API functions were confusingly named and structured, so I fully refactored the APIs to use clear naming (e.g. `DecodeFrom` and `CopyFromSeqnoRange`), object states, function preconditions, etc. * Previously the object could informally be sorted / compacted or not, and there was limited checking or enforcement on these states. Now there's a well-defined "enforced" state that is consistently checked in debug mode for applicable operations. (I attempted to create a separate "builder" class for unenforced states, but IIRC found that more cumbersome for existing uses than it was worth.) * Previously operations would coalesce data in a way that was better for `GetProximalTimeBeforeSeqno` than for `GetProximalSeqnoBeforeTime` which is odd because the latter is the only one used by DB code currently (what is the seqno cut-off for data definitely older than this given time?). This is now reversed to consistently favor `GetProximalSeqnoBeforeTime`, with that logic concentrated in one place: `SeqnoToTimeMapping::SeqnoTimePair::Merge()`. Unfortunately, a lot of unit test logic was specifically testing the old, suboptimal behavior. * Previously, the natural behavior of SeqnoToTimeMapping was to THROW AWAY data needed to get reasonable answers to the important `GetProximalSeqnoBeforeTime` queries. This is because SeqnoToTimeMapping only had a FIFO policy for staying within the entry capacity (except in aggregate+sort+serialize mode). If the DB wasn't extremely careful to avoid gathering too many time mappings, it could lose track of where the seqno cutoff was for cold data (`GetProximalSeqnoBeforeTime()` returning 0) and preventing all further data migration to the cold tier--until time passes etc. for mappings to catch up with FIFO purging of them. (The problem is not so acute because SST files contain relevant snapshots of the mappings, but the problem would apply to long-lived memtables.) * Now the SeqnoToTimeMapping class has fully-integrated smarts for keeping a sufficiently complete history, within capacity limits, to give good answers to `GetProximalSeqnoBeforeTime` queries. * Fixes old `// FIXME: be smarter about how we erase to avoid data falling off the front prematurely.` * Fix an apparent bug in how entries are selected for storing into SST files. Previously, it only selected entries within the seqno range of the file, but that would easily leave a gap at the beginning of the timeline for data in the file for the purposes of answering GetProximalXXX queries with reasonable accuracy. This could probably lead to the same problem discussed above in naively throwing away entries in FIFO order in the old SeqnoToTimeMapping. The updated testing of GetProximalSeqnoBeforeTime in BasicSeqnoToTimeMapping relies on the fixed behavior. * Fix a potential compaction CPU efficiency/scaling issue in which each compaction output file would iterate over and sort all seqno-to-time mappings from all compaction input files. Now we distill the input file entries to a constant size before processing each compaction output file. Intended follow-up (me or others): * Expand some direct testing of SeqnoToTimeMapping APIs. Here I've focused on updating existing tests to make sense. * There are likely more gaps in availability of needed SeqnoToTimeMapping data when the DB shuts down and is restarted, at least with WAL. * The data tracked in the DB could be kept more accurate and limited if it used the oldest seqno of unflushed data. This might require some more API refactoring. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12253 Test Plan: unit tests updated Reviewed By: jowlyzhang Differential Revision: D52913733 Pulled By: pdillinger fbshipit-source-id: 020737fcbbe6212f6701191a6ab86565054c9593
330 lines
12 KiB
C++
330 lines
12 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/event_helpers.h"
|
|
|
|
#include "rocksdb/convenience.h"
|
|
#include "rocksdb/listener.h"
|
|
#include "rocksdb/utilities/customizable_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
Status EventListener::CreateFromString(const ConfigOptions& config_options,
|
|
const std::string& id,
|
|
std::shared_ptr<EventListener>* result) {
|
|
return LoadSharedObject<EventListener>(config_options, id, result);
|
|
}
|
|
|
|
namespace {
|
|
template <class T>
|
|
inline T SafeDivide(T a, T b) {
|
|
return b == 0 ? 0 : a / b;
|
|
}
|
|
} // anonymous namespace
|
|
|
|
void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
|
|
*jwriter << "time_micros"
|
|
<< std::chrono::duration_cast<std::chrono::microseconds>(
|
|
std::chrono::system_clock::now().time_since_epoch())
|
|
.count();
|
|
}
|
|
|
|
void EventHelpers::NotifyTableFileCreationStarted(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, TableFileCreationReason reason) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileCreationBriefInfo info;
|
|
info.db_name = db_name;
|
|
info.cf_name = cf_name;
|
|
info.file_path = file_path;
|
|
info.job_id = job_id;
|
|
info.reason = reason;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileCreationStarted(info);
|
|
}
|
|
}
|
|
|
|
void EventHelpers::NotifyOnBackgroundError(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
|
|
bool* auto_recovery) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
db_mutex->AssertHeld();
|
|
// release lock while notifying events
|
|
db_mutex->Unlock();
|
|
for (auto& listener : listeners) {
|
|
listener->OnBackgroundError(reason, bg_error);
|
|
bg_error->PermitUncheckedError();
|
|
if (*auto_recovery) {
|
|
listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
|
|
}
|
|
}
|
|
db_mutex->Lock();
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyTableFileCreationFinished(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, const FileDescriptor& fd,
|
|
uint64_t oldest_blob_file_number, const TableProperties& table_properties,
|
|
TableFileCreationReason reason, const Status& s,
|
|
const std::string& file_checksum,
|
|
const std::string& file_checksum_func_name) {
|
|
if (s.ok() && event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
|
|
<< "table_file_creation"
|
|
<< "file_number" << fd.GetNumber() << "file_size"
|
|
<< fd.GetFileSize() << "file_checksum"
|
|
<< Slice(file_checksum).ToString(true) << "file_checksum_func_name"
|
|
<< file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno
|
|
<< "largest_seqno" << fd.largest_seqno;
|
|
|
|
// table_properties
|
|
{
|
|
jwriter << "table_properties";
|
|
jwriter.StartObject();
|
|
|
|
// basic properties:
|
|
jwriter << "data_size" << table_properties.data_size << "index_size"
|
|
<< table_properties.index_size << "index_partitions"
|
|
<< table_properties.index_partitions << "top_level_index_size"
|
|
<< table_properties.top_level_index_size
|
|
<< "index_key_is_user_key"
|
|
<< table_properties.index_key_is_user_key
|
|
<< "index_value_is_delta_encoded"
|
|
<< table_properties.index_value_is_delta_encoded << "filter_size"
|
|
<< table_properties.filter_size << "raw_key_size"
|
|
<< table_properties.raw_key_size << "raw_average_key_size"
|
|
<< SafeDivide(table_properties.raw_key_size,
|
|
table_properties.num_entries)
|
|
<< "raw_value_size" << table_properties.raw_value_size
|
|
<< "raw_average_value_size"
|
|
<< SafeDivide(table_properties.raw_value_size,
|
|
table_properties.num_entries)
|
|
<< "num_data_blocks" << table_properties.num_data_blocks
|
|
<< "num_entries" << table_properties.num_entries
|
|
<< "num_filter_entries" << table_properties.num_filter_entries
|
|
<< "num_deletions" << table_properties.num_deletions
|
|
<< "num_merge_operands" << table_properties.num_merge_operands
|
|
<< "num_range_deletions" << table_properties.num_range_deletions
|
|
<< "format_version" << table_properties.format_version
|
|
<< "fixed_key_len" << table_properties.fixed_key_len
|
|
<< "filter_policy" << table_properties.filter_policy_name
|
|
<< "column_family_name" << table_properties.column_family_name
|
|
<< "column_family_id" << table_properties.column_family_id
|
|
<< "comparator" << table_properties.comparator_name
|
|
<< "user_defined_timestamps_persisted"
|
|
<< table_properties.user_defined_timestamps_persisted
|
|
<< "merge_operator" << table_properties.merge_operator_name
|
|
<< "prefix_extractor_name"
|
|
<< table_properties.prefix_extractor_name << "property_collectors"
|
|
<< table_properties.property_collectors_names << "compression"
|
|
<< table_properties.compression_name << "compression_options"
|
|
<< table_properties.compression_options << "creation_time"
|
|
<< table_properties.creation_time << "oldest_key_time"
|
|
<< table_properties.oldest_key_time << "file_creation_time"
|
|
<< table_properties.file_creation_time
|
|
<< "slow_compression_estimated_data_size"
|
|
<< table_properties.slow_compression_estimated_data_size
|
|
<< "fast_compression_estimated_data_size"
|
|
<< table_properties.fast_compression_estimated_data_size
|
|
<< "db_id" << table_properties.db_id << "db_session_id"
|
|
<< table_properties.db_session_id << "orig_file_number"
|
|
<< table_properties.orig_file_number << "seqno_to_time_mapping";
|
|
|
|
if (table_properties.seqno_to_time_mapping.empty()) {
|
|
jwriter << "N/A";
|
|
} else {
|
|
SeqnoToTimeMapping tmp;
|
|
Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping);
|
|
if (status.ok()) {
|
|
jwriter << tmp.ToHumanString();
|
|
} else {
|
|
jwriter << "Invalid";
|
|
}
|
|
}
|
|
|
|
// user collected properties
|
|
for (const auto& prop : table_properties.readable_properties) {
|
|
jwriter << prop.first << prop.second;
|
|
}
|
|
jwriter.EndObject();
|
|
}
|
|
|
|
if (oldest_blob_file_number != kInvalidBlobFileNumber) {
|
|
jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
|
|
}
|
|
|
|
jwriter.EndObject();
|
|
|
|
event_logger->Log(jwriter);
|
|
}
|
|
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileCreationInfo info;
|
|
info.db_name = db_name;
|
|
info.cf_name = cf_name;
|
|
info.file_path = file_path;
|
|
info.file_size = fd.file_size;
|
|
info.job_id = job_id;
|
|
info.table_properties = table_properties;
|
|
info.reason = reason;
|
|
info.status = s;
|
|
info.file_checksum = file_checksum;
|
|
info.file_checksum_func_name = file_checksum_func_name;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileCreated(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyTableFileDeletion(
|
|
EventLogger* event_logger, int job_id, uint64_t file_number,
|
|
const std::string& file_path, const Status& status,
|
|
const std::string& dbname,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
|
|
jwriter << "job" << job_id << "event"
|
|
<< "table_file_deletion"
|
|
<< "file_number" << file_number;
|
|
if (!status.ok()) {
|
|
jwriter << "status" << status.ToString();
|
|
}
|
|
|
|
jwriter.EndObject();
|
|
|
|
event_logger->Log(jwriter);
|
|
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
TableFileDeletionInfo info;
|
|
info.db_name = dbname;
|
|
info.job_id = job_id;
|
|
info.file_path = file_path;
|
|
info.status = status;
|
|
for (auto& listener : listeners) {
|
|
listener->OnTableFileDeleted(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void EventHelpers::NotifyOnErrorRecoveryEnd(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const Status& old_bg_error, const Status& new_bg_error,
|
|
InstrumentedMutex* db_mutex) {
|
|
if (!listeners.empty()) {
|
|
db_mutex->AssertHeld();
|
|
// release lock while notifying events
|
|
db_mutex->Unlock();
|
|
TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:1");
|
|
TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:2");
|
|
for (auto& listener : listeners) {
|
|
BackgroundErrorRecoveryInfo info;
|
|
info.old_bg_error = old_bg_error;
|
|
info.new_bg_error = new_bg_error;
|
|
listener->OnErrorRecoveryCompleted(old_bg_error);
|
|
listener->OnErrorRecoveryEnd(info);
|
|
info.old_bg_error.PermitUncheckedError();
|
|
info.new_bg_error.PermitUncheckedError();
|
|
}
|
|
db_mutex->Lock();
|
|
} else {
|
|
old_bg_error.PermitUncheckedError();
|
|
}
|
|
}
|
|
|
|
void EventHelpers::NotifyBlobFileCreationStarted(
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id,
|
|
BlobFileCreationReason creation_reason) {
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
|
|
creation_reason);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileCreationStarted(info);
|
|
}
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyBlobFileCreationFinished(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
const std::string& db_name, const std::string& cf_name,
|
|
const std::string& file_path, int job_id, uint64_t file_number,
|
|
BlobFileCreationReason creation_reason, const Status& s,
|
|
const std::string& file_checksum,
|
|
const std::string& file_checksum_func_name, uint64_t total_blob_count,
|
|
uint64_t total_blob_bytes) {
|
|
if (s.ok() && event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
jwriter << "cf_name" << cf_name << "job" << job_id << "event"
|
|
<< "blob_file_creation"
|
|
<< "file_number" << file_number << "total_blob_count"
|
|
<< total_blob_count << "total_blob_bytes" << total_blob_bytes
|
|
<< "file_checksum" << file_checksum << "file_checksum_func_name"
|
|
<< file_checksum_func_name << "status" << s.ToString();
|
|
|
|
jwriter.EndObject();
|
|
event_logger->Log(jwriter);
|
|
}
|
|
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
|
|
creation_reason, total_blob_count, total_blob_bytes,
|
|
s, file_checksum, file_checksum_func_name);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileCreated(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
void EventHelpers::LogAndNotifyBlobFileDeletion(
|
|
EventLogger* event_logger,
|
|
const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
|
|
uint64_t file_number, const std::string& file_path, const Status& status,
|
|
const std::string& dbname) {
|
|
if (event_logger) {
|
|
JSONWriter jwriter;
|
|
AppendCurrentTime(&jwriter);
|
|
|
|
jwriter << "job" << job_id << "event"
|
|
<< "blob_file_deletion"
|
|
<< "file_number" << file_number;
|
|
if (!status.ok()) {
|
|
jwriter << "status" << status.ToString();
|
|
}
|
|
|
|
jwriter.EndObject();
|
|
event_logger->Log(jwriter);
|
|
}
|
|
if (listeners.empty()) {
|
|
return;
|
|
}
|
|
BlobFileDeletionInfo info(dbname, file_path, job_id, status);
|
|
for (const auto& listener : listeners) {
|
|
listener->OnBlobFileDeleted(info);
|
|
}
|
|
info.status.PermitUncheckedError();
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|