mirror of
https://github.com/facebook/rocksdb.git
synced 2024-12-04 20:02:50 +00:00
c73cf7a878
Summary: This PR adds user property collector factory `CompactForTieringCollectorFactory` to support observe SST file and mark it as need compaction for fast tracking data to the proper tier. A triggering ratio `compaction_trigger_ratio_` can be configured to achieve the following: 1) Setting the ratio to be equal to or smaller than 0 disables this collector 2) Setting the ratio to be within (0, 1] will write the number of observed eligible entries into a user property and marks a file as need-compaction when aforementioned condition is met. 3) Setting the ratio to be higher than 1 can be used to just writes the user table property, and not mark any file as need compaction. For a column family that does not enable tiering feature, even if an effective configuration is provided, this collector is still disabled. For a file that is already on the last level, this collector is also disabled. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12760 Test Plan: Added unit tests Reviewed By: pdillinger Differential Revision: D58734976 Pulled By: jowlyzhang fbshipit-source-id: 6daab2c4f62b5c6689c3c03e3b3907bbbe6b7a81
241 lines
10 KiB
C++
241 lines
10 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#pragma once
|
|
|
|
#include <atomic>
|
|
#include <deque>
|
|
#include <limits>
|
|
#include <list>
|
|
#include <set>
|
|
#include <string>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "db/blob/blob_file_completion_callback.h"
|
|
#include "db/column_family.h"
|
|
#include "db/flush_scheduler.h"
|
|
#include "db/internal_stats.h"
|
|
#include "db/job_context.h"
|
|
#include "db/log_writer.h"
|
|
#include "db/logs_with_prep_tracker.h"
|
|
#include "db/memtable_list.h"
|
|
#include "db/seqno_to_time_mapping.h"
|
|
#include "db/snapshot_impl.h"
|
|
#include "db/version_edit.h"
|
|
#include "db/write_controller.h"
|
|
#include "db/write_thread.h"
|
|
#include "logging/event_logger.h"
|
|
#include "monitoring/instrumented_mutex.h"
|
|
#include "options/db_options.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/listener.h"
|
|
#include "rocksdb/memtablerep.h"
|
|
#include "rocksdb/transaction_log.h"
|
|
#include "util/autovector.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/thread_local.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class DBImpl;
|
|
class MemTable;
|
|
class SnapshotChecker;
|
|
class TableCache;
|
|
class Version;
|
|
class VersionEdit;
|
|
class VersionSet;
|
|
class Arena;
|
|
|
|
class FlushJob {
|
|
public:
|
|
// TODO(icanadi) make effort to reduce number of parameters here
|
|
// IMPORTANT: mutable_cf_options needs to be alive while FlushJob is alive
|
|
FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
|
|
const ImmutableDBOptions& db_options,
|
|
const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
|
|
const FileOptions& file_options, VersionSet* versions,
|
|
InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
|
|
std::vector<SequenceNumber> existing_snapshots,
|
|
SequenceNumber earliest_write_conflict_snapshot,
|
|
SnapshotChecker* snapshot_checker, JobContext* job_context,
|
|
FlushReason flush_reason, LogBuffer* log_buffer,
|
|
FSDirectory* db_directory, FSDirectory* output_file_directory,
|
|
CompressionType output_compression, Statistics* stats,
|
|
EventLogger* event_logger, bool measure_io_stats,
|
|
const bool sync_output_directory, const bool write_manifest,
|
|
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
|
|
std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping,
|
|
const std::string& db_id = "", const std::string& db_session_id = "",
|
|
std::string full_history_ts_low = "",
|
|
BlobFileCompletionCallback* blob_callback = nullptr);
|
|
|
|
~FlushJob();
|
|
|
|
// Require db_mutex held.
|
|
// Once PickMemTable() is called, either Run() or Cancel() has to be called.
|
|
void PickMemTable();
|
|
// @param skip_since_bg_error If not nullptr and if atomic_flush=false,
|
|
// then it is set to true if flush installation is skipped and memtable
|
|
// is rolled back due to existing background error.
|
|
Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
|
|
FileMetaData* file_meta = nullptr,
|
|
bool* switched_to_mempurge = nullptr,
|
|
bool* skipped_since_bg_error = nullptr,
|
|
ErrorHandler* error_handler = nullptr);
|
|
void Cancel();
|
|
const autovector<MemTable*>& GetMemTables() const { return mems_; }
|
|
|
|
std::list<std::unique_ptr<FlushJobInfo>>* GetCommittedFlushJobsInfo() {
|
|
return &committed_flush_jobs_info_;
|
|
}
|
|
|
|
private:
|
|
friend class FlushJobTest_GetRateLimiterPriorityForWrite_Test;
|
|
|
|
void ReportStartedFlush();
|
|
void ReportFlushInputSize(const autovector<MemTable*>& mems);
|
|
void RecordFlushIOStats();
|
|
Status WriteLevel0Table();
|
|
|
|
// Memtable Garbage Collection algorithm: a MemPurge takes the list
|
|
// of immutable memtables and filters out (or "purge") the outdated bytes
|
|
// out of it. The output (the filtered bytes, or "useful payload") is
|
|
// then transfered into a new memtable. If this memtable is filled, then
|
|
// the mempurge is aborted and rerouted to a regular flush process. Else,
|
|
// depending on the heuristics, placed onto the immutable memtable list.
|
|
// The addition to the imm list will not trigger a flush operation. The
|
|
// flush of the imm list will instead be triggered once the mutable memtable
|
|
// is added to the imm list.
|
|
// This process is typically intended for workloads with heavy overwrites
|
|
// when we want to avoid SSD writes (and reads) as much as possible.
|
|
// "MemPurge" is an experimental feature still at a very early stage
|
|
// of development. At the moment it is only compatible with the Get, Put,
|
|
// Delete operations as well as Iterators and CompactionFilters.
|
|
// For this early version, "MemPurge" is called by setting the
|
|
// options.experimental_mempurge_threshold value as >0.0. When this is
|
|
// the case, ALL automatic flush operations (kWRiteBufferManagerFull) will
|
|
// first go through the MemPurge process. Therefore, we strongly
|
|
// recommend all users not to set this flag as true given that the MemPurge
|
|
// process has not matured yet.
|
|
Status MemPurge();
|
|
bool MemPurgeDecider(double threshold);
|
|
// The rate limiter priority (io_priority) is determined dynamically here.
|
|
Env::IOPriority GetRateLimiterPriority();
|
|
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
|
|
|
|
// Require db_mutex held.
|
|
// Called only when UDT feature is enabled and
|
|
// `persist_user_defined_timestamps` flag is false. Because we will refrain
|
|
// from flushing as long as there are still UDTs in a memtable that hasn't
|
|
// expired w.r.t `full_history_ts_low`. However, flush is continued if there
|
|
// is risk of entering write stall mode. In that case, we need
|
|
// to track the effective cutoff timestamp below which all the udts are
|
|
// removed because of flush, and use it to increase `full_history_ts_low` if
|
|
// the effective cutoff timestamp is newer. See
|
|
// `MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT` for details.
|
|
void GetEffectiveCutoffUDTForPickedMemTables();
|
|
|
|
// If this column family enables tiering feature, it will find the current
|
|
// `preclude_last_level_min_seqno_`, and the smaller one between this and
|
|
// the `earliset_snapshot_` will later be announced to user property
|
|
// collectors. It indicates to tiering use cases which data are old enough to
|
|
// be placed on the last level.
|
|
void GetPrecludeLastLevelMinSeqno();
|
|
|
|
Status MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();
|
|
|
|
const std::string& dbname_;
|
|
const std::string db_id_;
|
|
const std::string db_session_id_;
|
|
ColumnFamilyData* cfd_;
|
|
const ImmutableDBOptions& db_options_;
|
|
const MutableCFOptions& mutable_cf_options_;
|
|
// A variable storing the largest memtable id to flush in this
|
|
// flush job. RocksDB uses this variable to select the memtables to flush in
|
|
// this job. All memtables in this column family with an ID smaller than or
|
|
// equal to max_memtable_id_ will be selected for flush.
|
|
uint64_t max_memtable_id_;
|
|
FileOptions file_options_;
|
|
VersionSet* versions_;
|
|
InstrumentedMutex* db_mutex_;
|
|
std::atomic<bool>* shutting_down_;
|
|
std::vector<SequenceNumber> existing_snapshots_;
|
|
SequenceNumber earliest_snapshot_;
|
|
SequenceNumber earliest_write_conflict_snapshot_;
|
|
SnapshotChecker* snapshot_checker_;
|
|
JobContext* job_context_;
|
|
FlushReason flush_reason_;
|
|
LogBuffer* log_buffer_;
|
|
FSDirectory* db_directory_;
|
|
FSDirectory* output_file_directory_;
|
|
CompressionType output_compression_;
|
|
Statistics* stats_;
|
|
EventLogger* event_logger_;
|
|
TableProperties table_properties_;
|
|
bool measure_io_stats_;
|
|
// True if this flush job should call fsync on the output directory. False
|
|
// otherwise.
|
|
// Usually sync_output_directory_ is true. A flush job needs to call sync on
|
|
// the output directory before committing to the MANIFEST.
|
|
// However, an individual flush job does not have to call sync on the output
|
|
// directory if it is part of an atomic flush. After all flush jobs in the
|
|
// atomic flush succeed, call sync once on each distinct output directory.
|
|
const bool sync_output_directory_;
|
|
// True if this flush job should write to MANIFEST after successfully
|
|
// flushing memtables. False otherwise.
|
|
// Usually write_manifest_ is true. A flush job commits to the MANIFEST after
|
|
// flushing the memtables.
|
|
// However, an individual flush job cannot rashly write to the MANIFEST
|
|
// immediately after it finishes the flush if it is part of an atomic flush.
|
|
// In this case, only after all flush jobs succeed in flush can RocksDB
|
|
// commit to the MANIFEST.
|
|
const bool write_manifest_;
|
|
// The current flush job can commit flush result of a concurrent flush job.
|
|
// We collect FlushJobInfo of all jobs committed by current job and fire
|
|
// OnFlushCompleted for them.
|
|
std::list<std::unique_ptr<FlushJobInfo>> committed_flush_jobs_info_;
|
|
|
|
// Variables below are set by PickMemTable():
|
|
FileMetaData meta_;
|
|
autovector<MemTable*> mems_;
|
|
VersionEdit* edit_;
|
|
Version* base_;
|
|
bool pick_memtable_called;
|
|
Env::Priority thread_pri_;
|
|
|
|
const std::shared_ptr<IOTracer> io_tracer_;
|
|
SystemClock* clock_;
|
|
|
|
const std::string full_history_ts_low_;
|
|
BlobFileCompletionCallback* blob_callback_;
|
|
|
|
// Shared copy of DB's seqno to time mapping stored in SuperVersion. The
|
|
// ownership is shared with this FlushJob when it's created.
|
|
// FlushJob accesses and ref counts immutable MemTables directly via
|
|
// `MemTableListVersion` instead of ref `SuperVersion`, so we need to give
|
|
// the flush job shared ownership of the mapping.
|
|
// Note this is only installed when seqno to time recording feature is
|
|
// enables, so it could be nullptr.
|
|
std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping_;
|
|
|
|
// Keeps track of the newest user-defined timestamp for this flush job if
|
|
// `persist_user_defined_timestamps` flag is false.
|
|
std::string cutoff_udt_;
|
|
|
|
// The current minimum seqno that compaction jobs will preclude the data from
|
|
// the last level. Data with seqnos larger than this or larger than
|
|
// `earliest_snapshot_` will be output to the penultimate level had it gone
|
|
// through a compaction to the last level.
|
|
SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
|
|
};
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|