diff --git a/db/compaction_job.cc b/db/compaction_job.cc index d59ae6eafb..d79632fee2 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -205,10 +205,11 @@ CompactionJob::CompactionJob( std::vector existing_snapshots, std::shared_ptr table_cache, std::function yield_callback, EventLogger* event_logger, - bool paranoid_file_checks) + bool paranoid_file_checks, const std::string& dbname) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_stats_(1), + dbname_(dbname), db_options_(db_options), env_options_(env_options), env_(db_options.env), @@ -1020,13 +1021,9 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { } else { compact_->builder->Abandon(); } - if (s.ok()) { - table_properties = compact_->builder->GetTableProperties(); - } const uint64_t current_bytes = compact_->builder->FileSize(); compact_->current_output()->file_size = current_bytes; compact_->total_bytes += current_bytes; - compact_->builder.reset(); // Finish and check for file errors if (s.ok() && !db_options_.disableDataSync) { @@ -1058,16 +1055,23 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { delete iter; if (s.ok()) { + TableFileCreationInfo info(compact_->builder->GetTableProperties()); + info.db_name = dbname_; + info.cf_name = cfd->GetName(); + info.file_path = TableFileName(cfd->ioptions()->db_paths, + fd.GetNumber(), fd.GetPathId()); + info.file_size = fd.GetFileSize(); + info.job_id = job_id_; Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 " keys, %" PRIu64 " bytes", cfd->GetName().c_str(), job_id_, output_number, current_entries, current_bytes); - EventHelpers::LogTableFileCreation(event_logger_, job_id_, - output_number, current_bytes, - table_properties); + EventHelpers::LogAndNotifyTableFileCreation( + event_logger_, cfd->ioptions()->listeners, fd, info); } } + compact_->builder.reset(); return s; } diff --git a/db/compaction_job.h b/db/compaction_job.h index d34e4bdade..00e92f23f3 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -58,7 +58,8 @@ class CompactionJob { std::vector existing_snapshots, std::shared_ptr table_cache, std::function yield_callback, - EventLogger* event_logger, bool paranoid_file_checks); + EventLogger* event_logger, bool paranoid_file_checks, + const std::string& dbname); ~CompactionJob(); @@ -111,6 +112,7 @@ class CompactionJob { InternalStats::CompactionStats compaction_stats_; // DBImpl state + const std::string& dbname_; const DBOptions& db_options_; const EnvOptions& env_options_; Env* env_; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index e4c407a789..4460f6859d 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -166,7 +166,8 @@ TEST_F(CompactionJobTest, Simple) { CompactionJob compaction_job(0, compaction.get(), db_options_, env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr, nullptr, {}, table_cache_, - std::move(yield_callback), &event_logger, false); + std::move(yield_callback), &event_logger, false, + "dbname"); compaction_job.Prepare(); mutex_.Unlock(); diff --git a/db/db_impl.cc b/db/db_impl.cc index 2972024002..c4df365ce7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1162,13 +1162,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, cfd->GetLatestMutableCFOptions()->paranoid_file_checks; { mutex_.Unlock(); + TableFileCreationInfo info; s = BuildTable( dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), iter.get(), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), newest_snapshot, earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()), cfd->ioptions()->compression_opts, paranoid_file_checks, Env::IO_HIGH, - &table_properties); + &info.table_properties); LogFlush(db_options_.info_log); Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" @@ -1178,9 +1179,15 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, // output to event logger if (s.ok()) { - EventHelpers::LogTableFileCreation( - &event_logger_, job_id, meta.fd.GetNumber(), meta.fd.GetFileSize(), - table_properties); + info.db_name = dbname_; + info.cf_name = cfd->GetName(); + info.file_path = TableFileName(db_options_.db_paths, + meta.fd.GetNumber(), + meta.fd.GetPathId()); + info.file_size = meta.fd.GetFileSize(); + info.job_id = job_id; + EventHelpers::LogAndNotifyTableFileCreation( + &event_logger_, db_options_.listeners, meta.fd, info); } mutex_.Lock(); } @@ -1222,6 +1229,13 @@ Status DBImpl::FlushMemTableToOutputFile( &event_logger_); uint64_t file_number; + + // Within flush_job.Run, rocksdb may call event listener to notify + // file creation and deletion. + // + // Note that flush_job.Run will unlock and lock the db_mutex, + // and EventListener callback will be called when the db_mutex + // is unlocked by the current thread. Status s = flush_job.Run(&file_number); if (s.ok()) { @@ -1516,12 +1530,14 @@ Status DBImpl::CompactFilesImpl( &shutting_down_, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), stats_, snapshots_.GetAll(), table_cache_, std::move(yield_callback), - &event_logger_, c->mutable_cf_options()->paranoid_file_checks); + &event_logger_, c->mutable_cf_options()->paranoid_file_checks, + dbname_); compaction_job.Prepare(); mutex_.Unlock(); Status status = compaction_job.Run(); mutex_.Lock(); + compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { InstallSuperVersionBackground(c->column_family_data(), job_context, @@ -2439,11 +2455,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(c->GetOutputPathId()), stats_, snapshots_.GetAll(), table_cache_, std::move(yield_callback), - &event_logger_, c->mutable_cf_options()->paranoid_file_checks); + &event_logger_, c->mutable_cf_options()->paranoid_file_checks, + dbname_); compaction_job.Prepare(); + mutex_.Unlock(); status = compaction_job.Run(); mutex_.Lock(); + compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { InstallSuperVersionBackground(c->column_family_data(), job_context, diff --git a/db/db_impl.h b/db/db_impl.h index 81ed7482eb..333f26d3b7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -22,6 +22,8 @@ #include "db/log_writer.h" #include "db/snapshot.h" #include "db/column_family.h" +#include "db/compaction_job.h" +#include "db/flush_job.h" #include "db/version_edit.h" #include "db/wal_manager.h" #include "db/writebuffer.h" diff --git a/db/event_helpers.cc b/db/event_helpers.cc index 0eb4d543f7..2f9b9c48a5 100644 --- a/db/event_helpers.cc +++ b/db/event_helpers.cc @@ -17,15 +17,19 @@ void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) { std::chrono::system_clock::now().time_since_epoch()).count(); } -void EventHelpers::LogTableFileCreation( - EventLogger* event_logger, int job_id, uint64_t file_number, - uint64_t file_size, const TableProperties& table_properties) { +// TODO(yhchiang): change the API to directly take TableFileCreationInfo +void EventHelpers::LogAndNotifyTableFileCreation( + EventLogger* event_logger, + const std::vector>& listeners, + const FileDescriptor& fd, const TableFileCreationInfo& info) { + assert(event_logger); JSONWriter jwriter; AppendCurrentTime(&jwriter); - jwriter << "job" << job_id - << "event" << "table_file_creation" - << "file_number" << file_number - << "file_size" << file_size; + jwriter << "cf_name" << info.cf_name + << "job" << info.job_id + << "event" << "table_file_creation" + << "file_number" << fd.GetNumber() + << "file_size" << fd.GetFileSize(); // table_properties { @@ -33,22 +37,24 @@ void EventHelpers::LogTableFileCreation( jwriter.StartObject(); // basic properties: - jwriter << "data_size" << table_properties.data_size - << "index_size" << table_properties.index_size - << "filter_size" << table_properties.filter_size - << "raw_key_size" << table_properties.raw_key_size + jwriter << "data_size" << info.table_properties.data_size + << "index_size" << info.table_properties.index_size + << "filter_size" << info.table_properties.filter_size + << "raw_key_size" << info.table_properties.raw_key_size << "raw_average_key_size" << SafeDivide( - table_properties.raw_key_size, - table_properties.num_entries) - << "raw_value_size" << table_properties.raw_value_size + info.table_properties.raw_key_size, + info.table_properties.num_entries) + << "raw_value_size" << info.table_properties.raw_value_size << "raw_average_value_size" << SafeDivide( - table_properties.raw_value_size, table_properties.num_entries) - << "num_data_blocks" << table_properties.num_data_blocks - << "num_entries" << table_properties.num_entries - << "filter_policy_name" << table_properties.filter_policy_name; + info.table_properties.raw_value_size, + info.table_properties.num_entries) + << "num_data_blocks" << info.table_properties.num_data_blocks + << "num_entries" << info.table_properties.num_entries + << "filter_policy_name" << + info.table_properties.filter_policy_name; // user collected properties - for (const auto& prop : table_properties.user_collected_properties) { + for (const auto& prop : info.table_properties.user_collected_properties) { jwriter << prop.first << prop.second; } jwriter.EndObject(); @@ -56,6 +62,16 @@ void EventHelpers::LogTableFileCreation( jwriter.EndObject(); event_logger->Log(jwriter); + +#ifndef ROCKSDB_LITE + if (listeners.size() == 0) { + return; + } + + for (auto listener : listeners) { + listener->OnTableFileCreated(info); + } +#endif // !ROCKSDB_LITE } } // namespace rocksdb diff --git a/db/event_helpers.h b/db/event_helpers.h index 57c7975546..0a3843151b 100644 --- a/db/event_helpers.h +++ b/db/event_helpers.h @@ -4,16 +4,25 @@ // of patent rights can be found in the PATENTS file in the same directory. #pragma once -#include "util/event_logger.h" +#include +#include +#include + +#include "db/column_family.h" +#include "db/version_edit.h" +#include "rocksdb/listener.h" #include "rocksdb/table_properties.h" +#include "util/event_logger.h" namespace rocksdb { class EventHelpers { public: static void AppendCurrentTime(JSONWriter* json_writer); - static void LogTableFileCreation(EventLogger* event_logger, int job_id, - uint64_t file_number, uint64_t file_size, - const TableProperties& table_properties); + static void LogAndNotifyTableFileCreation( + EventLogger* event_logger, + const std::vector>& listeners, + const FileDescriptor& fd, const TableFileCreationInfo& info); }; + } // namespace rocksdb diff --git a/db/flush_job.cc b/db/flush_job.cc index 820548675c..c59d56cef7 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -222,7 +222,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, << total_num_entries << "num_deletes" << total_num_deletes << "memory_usage" << total_memory_usage; - TableProperties table_properties; + TableFileCreationInfo info; { ScopedArenaIterator iter( NewMergingIterator(&cfd_->internal_comparator(), &memtables[0], @@ -240,7 +240,7 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, earliest_seqno_in_memtable, output_compression_, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, Env::IO_HIGH, - &table_properties); + &info.table_properties); LogFlush(db_options_.info_log); } Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, @@ -250,9 +250,16 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, // output to event logger if (s.ok()) { - EventHelpers::LogTableFileCreation( - event_logger_, job_context_->job_id, meta.fd.GetNumber(), - meta.fd.GetFileSize(), table_properties); + info.db_name = dbname_; + info.cf_name = cfd_->GetName(); + info.file_path = TableFileName(db_options_.db_paths, + meta.fd.GetNumber(), + meta.fd.GetPathId()); + info.file_size = meta.fd.GetFileSize(); + info.job_id = job_context_->job_id; + EventHelpers::LogAndNotifyTableFileCreation( + event_logger_, db_options_.listeners, + meta.fd, info); } if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { diff --git a/db/listener_test.cc b/db/listener_test.cc index a605bffe94..a550d12240 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -208,27 +208,48 @@ TEST_F(EventListenerTest, OnSingleDBCompactionTest) { } } +// This simple Listener can only handle one flush at a time. class TestFlushListener : public EventListener { public: + void OnTableFileCreated( + const TableFileCreationInfo& info) { + db_name_ = info.db_name; + cf_name_ = info.cf_name; + file_path_ = info.file_path; + ASSERT_GT(info.table_properties.data_size, 0U); + ASSERT_GT(info.table_properties.raw_key_size, 0U); + ASSERT_GT(info.table_properties.raw_value_size, 0U); + ASSERT_GT(info.table_properties.num_data_blocks, 0U); + ASSERT_GT(info.table_properties.num_entries, 0U); + } + void OnFlushCompleted( - DB* db, const std::string& name, + DB* db, const std::string& cf_name, const std::string& file_path, bool triggered_writes_slowdown, bool triggered_writes_stop) override { flushed_dbs_.push_back(db); - flushed_column_family_names_.push_back(name); + flushed_column_family_names_.push_back(cf_name); if (triggered_writes_slowdown) { slowdown_count++; } if (triggered_writes_stop) { stop_count++; } + // verify the file created matches the flushed file. + ASSERT_EQ(db_name_, db->GetName()); + ASSERT_EQ(cf_name_, cf_name); + ASSERT_GT(file_path.size(), 0U); + ASSERT_EQ(file_path, file_path_); } std::vector flushed_column_family_names_; std::vector flushed_dbs_; int slowdown_count; int stop_count; + std::string db_name_; + std::string cf_name_; + std::string file_path_; }; TEST_F(EventListenerTest, OnSingleDBFlushTest) { diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 018f3832a5..60b3e0e916 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -9,6 +9,7 @@ #include #include #include "rocksdb/status.h" +#include "rocksdb/table_properties.h" namespace rocksdb { @@ -28,6 +29,25 @@ struct CompactionJobInfo { std::vector output_files; }; +struct TableFileCreationInfo { + TableFileCreationInfo() = default; + explicit TableFileCreationInfo(TableProperties&& prop) : + table_properties(prop) {} + // the name of the database where the file was created + std::string db_name; + // the name of the column family where the file was created. + std::string cf_name; + // the path to the created file. + std::string file_path; + // the size of the file. + uint64_t file_size; + // the id of the job (which could be flush or compaction) that + // created the file. + int job_id; + // Detailed properties of the created file. + TableProperties table_properties; +}; + // EventListener class contains a set of call-back functions that will // be called when specific RocksDB event happens such as flush. It can // be used as a building block for developing custom features such as @@ -99,6 +119,21 @@ class EventListener { // after this function is returned, and must be copied if it is needed // outside of this function. virtual void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) {} + + // A call-back function for RocksDB which will be called whenever + // a SST file is created. Different from OnCompactionCompleted and + // OnFlushCompleted, this call-back is designed for external logging + // service and thus only provide string parameters instead + // of a pointer to DB. Applications that build logic basic based + // on file creations and deletions is suggested to implement + // OnFlushCompleted and OnCompactionCompleted. + // + // Note that if applications would like to use the passed reference + // outside this function call, they should make copies from these + // returned value. + virtual void OnTableFileCreated( + const TableFileCreationInfo& info) {} + virtual ~EventListener() {} }; diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 8572021e06..9a515cf45d 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -24,7 +24,7 @@ namespace rocksdb { // ++pos) { // ... // } -typedef std::map UserCollectedProperties; +typedef std::map UserCollectedProperties; // TableProperties contains a bunch of read-only properties of its associated // table. diff --git a/tools/db_stress.cc b/tools/db_stress.cc index e94028ab5f..c4bb9cc3dc 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -821,6 +821,18 @@ class DbStressListener : public EventListener { std::chrono::microseconds(rand_.Uniform(5000))); } + virtual void OnTableFileCreated( + const TableFileCreationInfo& info) override { + assert(info.db_name == db_name_); + assert(IsValidColumnFamilyName(info.cf_name)); + VerifyFilePath(info.file_path); + assert(info.file_size > 0); + assert(info.job_id > 0); + assert(info.table_properties.data_size > 0); + assert(info.table_properties.raw_key_size > 0); + assert(info.table_properties.num_entries > 0); + } + protected: bool IsValidColumnFamilyName(const std::string& cf_name) const { if (cf_name == kDefaultColumnFamilyName) {