diff --git a/HISTORY.md b/HISTORY.md index 9f2d840d42..84f5d2a2df 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ * Clarified the required semantics of Read() functions in FileSystem and Env APIs. Please ensure any custom implementations are compliant. * For the new integrated BlobDB implementation, compaction statistics now include the amount of data read from blob files during compaction (due to garbage collection or compaction filters). Write amplification metrics have also been extended to account for data read from blob files. * Add EqualWithoutTimestamp() to Comparator. +* Extend support to track blob files in SSTFileManager whenever a blob file is created/deleted. Blob files will be scheduled to delete via SSTFileManager and SStFileManager will now take blob files in account while calculating size and space limits along with SST files. ### New Features * Support compaction filters for the new implementation of BlobDB. Add `FilterBlobByKey()` to `CompactionFilter`. Subclasses can override this method so that compaction filters can determine whether the actual blob value has to be read during compaction. Use a new `kUndetermined` in `CompactionFilter::Decision` to indicated that further action is necessary for compaction filter to make a decision. diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 6f085feffa..4ef836a6bd 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -8,6 +8,7 @@ #include #include "db/blob/blob_file_addition.h" +#include "db/blob/blob_file_completion_callback.h" #include "db/blob/blob_index.h" #include "db/blob/blob_log_format.h" #include "db/blob/blob_log_writer.h" @@ -34,12 +35,13 @@ BlobFileBuilder::BlobFileBuilder( const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, + BlobFileCompletionCallback* blob_callback, std::vector* blob_file_paths, std::vector* blob_file_additions) : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs, immutable_cf_options, mutable_cf_options, file_options, job_id, column_family_id, column_family_name, io_priority, - write_hint, io_tracer, blob_file_paths, + write_hint, io_tracer, blob_callback, blob_file_paths, blob_file_additions) {} BlobFileBuilder::BlobFileBuilder( @@ -50,6 +52,7 @@ BlobFileBuilder::BlobFileBuilder( const std::string& column_family_name, Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, + BlobFileCompletionCallback* blob_callback, std::vector* blob_file_paths, std::vector* blob_file_additions) : file_number_generator_(std::move(file_number_generator)), @@ -65,6 +68,7 @@ BlobFileBuilder::BlobFileBuilder( io_priority_(io_priority), write_hint_(write_hint), io_tracer_(io_tracer), + blob_callback_(blob_callback), blob_file_paths_(blob_file_paths), blob_file_additions_(blob_file_additions), blob_count_(0), @@ -303,11 +307,15 @@ Status BlobFileBuilder::CloseBlobFile() { column_family_name_.c_str(), job_id_, blob_file_number, blob_count_, blob_bytes_); + if (blob_callback_) { + s = blob_callback_->OnBlobFileCompleted(blob_file_paths_->back()); + } + writer_.reset(); blob_count_ = 0; blob_bytes_ = 0; - return Status::OK(); + return s; } Status BlobFileBuilder::CloseBlobFileIfNeeded() { @@ -323,4 +331,20 @@ Status BlobFileBuilder::CloseBlobFileIfNeeded() { return CloseBlobFile(); } +void BlobFileBuilder::Abandon() { + if (!IsBlobFileOpen()) { + return; + } + + if (blob_callback_) { + // BlobFileBuilder::Abandon() is called because of error while writing to + // Blob files. So we can ignore the below error. + blob_callback_->OnBlobFileCompleted(blob_file_paths_->back()) + .PermitUncheckedError(); + } + + writer_.reset(); + blob_count_ = 0; + blob_bytes_ = 0; +} } // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/blob_file_builder.h b/db/blob/blob_file_builder.h index 7ee7064e88..965314379f 100644 --- a/db/blob/blob_file_builder.h +++ b/db/blob/blob_file_builder.h @@ -27,6 +27,7 @@ class Status; class Slice; class BlobLogWriter; class IOTracer; +class BlobFileCompletionCallback; class BlobFileBuilder { public: @@ -39,6 +40,7 @@ class BlobFileBuilder { Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, + BlobFileCompletionCallback* blob_callback, std::vector* blob_file_paths, std::vector* blob_file_additions); @@ -52,6 +54,7 @@ class BlobFileBuilder { Env::IOPriority io_priority, Env::WriteLifeTimeHint write_hint, const std::shared_ptr& io_tracer, + BlobFileCompletionCallback* blob_callback, std::vector* blob_file_paths, std::vector* blob_file_additions); @@ -62,6 +65,7 @@ class BlobFileBuilder { Status Add(const Slice& key, const Slice& value, std::string* blob_index); Status Finish(); + void Abandon(); private: bool IsBlobFileOpen() const; @@ -85,6 +89,7 @@ class BlobFileBuilder { Env::IOPriority io_priority_; Env::WriteLifeTimeHint write_hint_; std::shared_ptr io_tracer_; + BlobFileCompletionCallback* blob_callback_; std::vector* blob_file_paths_; std::vector* blob_file_additions_; std::unique_ptr writer_; diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc index 11af00e5de..75489298c8 100644 --- a/db/blob/blob_file_builder_test.cc +++ b/db/blob/blob_file_builder_test.cc @@ -144,8 +144,9 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) { BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, - write_hint, nullptr /*IOTracer*/, &blob_file_paths, - &blob_file_additions); + write_hint, nullptr /*IOTracer*/, + nullptr /*BlobFileCompletionCallback*/, + &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -228,8 +229,9 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) { BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, - write_hint, nullptr /*IOTracer*/, &blob_file_paths, - &blob_file_additions); + write_hint, nullptr /*IOTracer*/, + nullptr /*BlobFileCompletionCallback*/, + &blob_file_paths, &blob_file_additions); std::vector> expected_key_value_pairs( number_of_blobs); @@ -314,8 +316,9 @@ TEST_F(BlobFileBuilderTest, InlinedValues) { BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, - write_hint, nullptr /*IOTracer*/, &blob_file_paths, - &blob_file_additions); + write_hint, nullptr /*IOTracer*/, + nullptr /*BlobFileCompletionCallback*/, + &blob_file_paths, &blob_file_additions); for (size_t i = 0; i < number_of_blobs; ++i) { const std::string key = std::to_string(i); @@ -367,8 +370,9 @@ TEST_F(BlobFileBuilderTest, Compression) { BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, - write_hint, nullptr /*IOTracer*/, &blob_file_paths, - &blob_file_additions); + write_hint, nullptr /*IOTracer*/, + nullptr /*BlobFileCompletionCallback*/, + &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string uncompressed_value(value_size, 'x'); @@ -449,8 +453,9 @@ TEST_F(BlobFileBuilderTest, CompressionError) { BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, - write_hint, nullptr /*IOTracer*/, &blob_file_paths, - &blob_file_additions); + write_hint, nullptr /*IOTracer*/, + nullptr /*BlobFileCompletionCallback*/, + &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue", [](void* arg) { @@ -527,8 +532,9 @@ TEST_F(BlobFileBuilderTest, Checksum) { BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, - write_hint, nullptr /*IOTracer*/, &blob_file_paths, - &blob_file_additions); + write_hint, nullptr /*IOTracer*/, + nullptr /*BlobFileCompletionCallback*/, + &blob_file_paths, &blob_file_additions); const std::string key("1"); const std::string value("deadbeef"); @@ -623,8 +629,9 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) { BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options, &mutable_cf_options, &file_options_, job_id, column_family_id, column_family_name, io_priority, - write_hint, nullptr /*IOTracer*/, &blob_file_paths, - &blob_file_additions); + write_hint, nullptr /*IOTracer*/, + nullptr /*BlobFileCompletionCallback*/, + &blob_file_paths, &blob_file_additions); SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) { Status* const s = static_cast(arg); diff --git a/db/blob/blob_file_completion_callback.h b/db/blob/blob_file_completion_callback.h new file mode 100644 index 0000000000..58d708eb1c --- /dev/null +++ b/db/blob/blob_file_completion_callback.h @@ -0,0 +1,53 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include "db/error_handler.h" +#include "file/sst_file_manager_impl.h" +#include "rocksdb/status.h" + +namespace ROCKSDB_NAMESPACE { + +class BlobFileCompletionCallback { + public: + BlobFileCompletionCallback(SstFileManager* sst_file_manager, + InstrumentedMutex* mutex, + ErrorHandler* error_handler) + : sst_file_manager_(sst_file_manager), + mutex_(mutex), + error_handler_(error_handler) {} + + Status OnBlobFileCompleted(const std::string& file_name) { + Status s; + +#ifndef ROCKSDB_LITE + auto sfm = static_cast(sst_file_manager_); + if (sfm) { + // Report new blob files to SstFileManagerImpl + s = sfm->OnAddFile(file_name); + if (sfm->IsMaxAllowedSpaceReached()) { + s = Status::SpaceLimit("Max allowed space was reached"); + TEST_SYNC_POINT( + "BlobFileCompletionCallback::CallBack::MaxAllowedSpaceReached"); + InstrumentedMutexLock l(mutex_); + error_handler_->SetBGError(s, BackgroundErrorReason::kFlush); + } + } +#else + (void)file_name; +#endif // ROCKSDB_LITE + return s; + } + + private: + SstFileManager* sst_file_manager_; + InstrumentedMutex* mutex_; + ErrorHandler* error_handler_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/db/blob/db_blob_compaction_test.cc b/db/blob/db_blob_compaction_test.cc index 8c02046f97..29f10f2e22 100644 --- a/db/blob/db_blob_compaction_test.cc +++ b/db/blob/db_blob_compaction_test.cc @@ -16,32 +16,6 @@ class DBBlobCompactionTest : public DBTestBase { explicit DBBlobCompactionTest() : DBTestBase("/db_blob_compaction_test", /*env_do_fsync=*/false) {} - // TODO: copied from DBCompactionTest. Should be de-duplicated in the future. - std::vector GetBlobFileNumbers() { - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); - assert(versions); - - ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); - assert(cfd); - - Version* const current = cfd->current(); - assert(current); - - const VersionStorageInfo* const storage_info = current->storage_info(); - assert(storage_info); - - const auto& blob_files = storage_info->GetBlobFiles(); - - std::vector result; - result.reserve(blob_files.size()); - - for (const auto& blob_file : blob_files) { - result.emplace_back(blob_file.first); - } - - return result; - } - #ifndef ROCKSDB_LITE const std::vector& GetCompactionStats() { VersionSet* const versions = dbfull()->TEST_GetVersionSet(); diff --git a/db/builder.cc b/db/builder.cc index 3a713432b1..ad30be735e 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -23,6 +23,7 @@ #include "db/range_del_aggregator.h" #include "db/table_cache.h" #include "db/version_edit.h" +#include "file/file_util.h" #include "file/filename.h" #include "file/read_write_util.h" #include "file/writable_file_writer.h" @@ -92,7 +93,8 @@ Status BuildTable( TableProperties* table_properties, int level, const uint64_t creation_time, const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint, const uint64_t file_creation_time, const std::string& db_id, - const std::string& db_session_id, const std::string* full_history_ts_low) { + const std::string& db_session_id, const std::string* full_history_ts_low, + BlobFileCompletionCallback* blob_callback) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -175,10 +177,11 @@ Status BuildTable( std::unique_ptr blob_file_builder( (mutable_cf_options.enable_blob_files && blob_file_additions) - ? new BlobFileBuilder( - versions, fs, &ioptions, &mutable_cf_options, &file_options, - job_id, column_family_id, column_family_name, io_priority, - write_hint, io_tracer, &blob_file_paths, blob_file_additions) + ? new BlobFileBuilder(versions, fs, &ioptions, &mutable_cf_options, + &file_options, job_id, column_family_id, + column_family_name, io_priority, write_hint, + io_tracer, blob_callback, &blob_file_paths, + blob_file_additions) : nullptr); CompactionIterator c_iter( @@ -278,8 +281,9 @@ Status BuildTable( if (blob_file_builder) { if (s.ok()) { s = blob_file_builder->Finish(); + } else { + blob_file_builder->Abandon(); } - blob_file_builder.reset(); } @@ -339,8 +343,10 @@ Status BuildTable( if (blob_file_additions) { for (const std::string& blob_file_path : blob_file_paths) { - ignored = fs->DeleteFile(blob_file_path, IOOptions(), dbg); + ignored = DeleteDBFile(&db_options, blob_file_path, dbname, + /*force_bg=*/false, /*force_fg=*/false); ignored.PermitUncheckedError(); + TEST_SYNC_POINT("BuildTable::AfterDeleteFile"); } } } diff --git a/db/builder.h b/db/builder.h index 990b10f3a8..d7a064fc26 100644 --- a/db/builder.h +++ b/db/builder.h @@ -38,6 +38,7 @@ class VersionEdit; class TableBuilder; class WritableFileWriter; class InternalStats; +class BlobFileCompletionCallback; // @param column_family_name Name of the column family that is also identified // by column_family_id, or empty string if unknown. It must outlive the @@ -90,6 +91,7 @@ extern Status BuildTable( Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET, const uint64_t file_creation_time = 0, const std::string& db_id = "", const std::string& db_session_id = "", - const std::string* full_history_ts_low = nullptr); + const std::string* full_history_ts_low = nullptr, + BlobFileCompletionCallback* blob_callback = nullptr); } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index df9e029f94..2dc6167d80 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -310,7 +310,8 @@ CompactionJob::CompactionJob( const std::string& dbname, CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic* manual_compaction_paused, const std::string& db_id, - const std::string& db_session_id, std::string full_history_ts_low) + const std::string& db_session_id, std::string full_history_ts_low, + BlobFileCompletionCallback* blob_callback) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), @@ -346,7 +347,8 @@ CompactionJob::CompactionJob( measure_io_stats_(measure_io_stats), write_hint_(Env::WLTH_NOT_SET), thread_pri_(thread_pri), - full_history_ts_low_(std::move(full_history_ts_low)) { + full_history_ts_low_(std::move(full_history_ts_low)), + blob_callback_(blob_callback) { assert(compaction_job_stats_ != nullptr); assert(log_buffer_ != nullptr); const auto* cfd = compact_->compaction->column_family_data(); @@ -978,12 +980,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { std::unique_ptr blob_file_builder( mutable_cf_options->enable_blob_files - ? new BlobFileBuilder( - versions_, fs_.get(), - sub_compact->compaction->immutable_cf_options(), - mutable_cf_options, &file_options_, job_id_, cfd->GetID(), - cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_, - io_tracer_, &blob_file_paths, &sub_compact->blob_file_additions) + ? new BlobFileBuilder(versions_, fs_.get(), + sub_compact->compaction->immutable_cf_options(), + mutable_cf_options, &file_options_, job_id_, + cfd->GetID(), cfd->GetName(), + Env::IOPriority::IO_LOW, write_hint_, + io_tracer_, blob_callback_, &blob_file_paths, + &sub_compact->blob_file_additions) : nullptr); TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); @@ -1189,8 +1192,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (blob_file_builder) { if (status.ok()) { status = blob_file_builder->Finish(); + } else { + blob_file_builder->Abandon(); } - blob_file_builder.reset(); } diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 17937b5419..1b3da1511c 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -17,6 +17,7 @@ #include #include +#include "db/blob/blob_file_completion_callback.h" #include "db/column_family.h" #include "db/compaction/compaction_iterator.h" #include "db/dbformat.h" @@ -80,7 +81,8 @@ class CompactionJob { Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::atomic* manual_compaction_paused = nullptr, const std::string& db_id = "", const std::string& db_session_id = "", - std::string full_history_ts_low = ""); + std::string full_history_ts_low = "", + BlobFileCompletionCallback* blob_callback = nullptr); ~CompactionJob(); @@ -204,6 +206,7 @@ class CompactionJob { Env::Priority thread_pri_; IOStatus io_status_; std::string full_history_ts_low_; + BlobFileCompletionCallback* blob_callback_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index c17dad44c6..697fb6ffba 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -33,31 +33,6 @@ class DBCompactionTest : public DBTestBase { public: DBCompactionTest() : DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {} - - std::vector GetBlobFileNumbers() { - VersionSet* const versions = dbfull()->TEST_GetVersionSet(); - assert(versions); - - ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); - assert(cfd); - - Version* const current = cfd->current(); - assert(current); - - const VersionStorageInfo* const storage_info = current->storage_info(); - assert(storage_info); - - const auto& blob_files = storage_info->GetBlobFiles(); - - std::vector result; - result.reserve(blob_files.size()); - - for (const auto& blob_file : blob_files) { - result.emplace_back(blob_file.first); - } - - return result; - } }; class DBCompactionTestWithParam diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index fc3cb1e658..aaa3b5125a 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -231,7 +231,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, preserve_deletes_(options.preserve_deletes), closed_(false), error_handler_(this, immutable_db_options_, &mutex_), - atomic_flush_install_cv_(&mutex_) { + atomic_flush_install_cv_(&mutex_), + blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_, + &error_handler_) { // !batch_per_trx_ implies seq_per_batch_ because it is only unset for // WriteUnprepared, which should use seq_per_batch_. assert(batch_per_txn_ || seq_per_batch_); @@ -511,6 +513,11 @@ Status DBImpl::CloseHelper() { } mutex_.Unlock(); + // Below check is added as recovery_error_ is not checked and it causes crash + // in DBSSTTest.DBWithMaxSpaceAllowedWithBlobFiles when space limit is + // reached. + error_handler_.GetRecoveryError().PermitUncheckedError(); + // CancelAllBackgroundWork called with false means we just set the shutdown // marker. After this we do a variant of the waiting and unschedule work // (to consider: moving all the waiting into CancelAllBackgroundWork(true)) @@ -3943,7 +3950,8 @@ Status DestroyDB(const std::string& dbname, const Options& options, std::string path_to_delete = dbname + "/" + fname; if (type == kMetaDatabase) { del = DestroyDB(path_to_delete, options); - } else if (type == kTableFile || type == kWalFile) { + } else if (type == kTableFile || type == kWalFile || + type == kBlobFile) { del = DeleteDBFile(&soptions, path_to_delete, dbname, /*force_bg=*/false, /*force_fg=*/!wal_in_db_path); } else { @@ -3968,9 +3976,10 @@ Status DestroyDB(const std::string& dbname, const Options& options, if (env->GetChildren(path, &filenames).ok()) { for (const auto& fname : filenames) { if (ParseFileName(fname, &number, &type) && - type == kTableFile) { // Lock file will be deleted at end - std::string table_path = path + "/" + fname; - Status del = DeleteDBFile(&soptions, table_path, dbname, + (type == kTableFile || + type == kBlobFile)) { // Lock file will be deleted at end + std::string file_path = path + "/" + fname; + Status del = DeleteDBFile(&soptions, file_path, dbname, /*force_bg=*/false, /*force_fg=*/false); if (!del.ok() && result.ok()) { result = del; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 97d8a4d227..12b880ca8f 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2219,6 +2219,8 @@ class DBImpl : public DB { InstrumentedCondVar atomic_flush_install_cv_; bool wal_in_db_path_; + + BlobFileCompletionCallback blob_callback_; }; extern Options SanitizeOptions(const std::string& db, const Options& src); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index a3ed3636c0..279cddd77e 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -166,7 +166,8 @@ Status DBImpl::FlushMemTableToOutputFile( GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri, - io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow()); + io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow(), + &blob_callback_); FileMetaData file_meta; TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"); diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 8e50668a6f..b8b26eeb0e 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1364,7 +1364,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, io_tracer_, &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, -1 /* level */, current_time, 0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */, - db_id_, db_session_id_); + db_id_, db_session_id_, nullptr /*full_history_ts_low*/, + &blob_callback_); LogFlush(immutable_db_options_.info_log); ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" @@ -1723,6 +1724,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, std::vector metadata; + // TODO: Once GetLiveFilesMetaData supports blob files, update the logic + // below to get known_file_sizes for blob files. impl->mutex_.Lock(); impl->versions_->GetLiveFilesMetaData(&metadata); impl->mutex_.Unlock(); @@ -1755,13 +1758,12 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, FileType file_type; std::string file_path = path + "/" + file_name; if (ParseFileName(file_name, &file_number, &file_type) && - file_type == kTableFile) { + (file_type == kTableFile || file_type == kBlobFile)) { // TODO: Check for errors from OnAddFile? if (known_file_sizes.count(file_name)) { // We're assuming that each sst file name exists in at most one of // the paths. - sfm->OnAddFile(file_path, known_file_sizes.at(file_name), - /* compaction */ false) + sfm->OnAddFile(file_path, known_file_sizes.at(file_name)) .PermitUncheckedError(); } else { sfm->OnAddFile(file_path).PermitUncheckedError(); diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 7987daa804..21d9ccf2cb 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -306,13 +306,13 @@ TEST_F(DBSSTTest, DBWithSstFileManager) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); // Verify that we are tracking all sst files in dbname_ std::unordered_map files_in_db; - ASSERT_OK(GetAllSSTFiles(&files_in_db)); + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db)); ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); } ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); std::unordered_map files_in_db; - ASSERT_OK(GetAllSSTFiles(&files_in_db)); + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db)); // Verify that we are tracking all sst files in dbname_ ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); // Verify the total files size @@ -346,6 +346,260 @@ TEST_F(DBSSTTest, DBWithSstFileManager) { ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + + int files_added = 0; + int files_deleted = 0; + int files_moved = 0; + int files_scheduled_to_delete = 0; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnAddFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + files_added++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnDeleteFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + files_deleted++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) { + assert(arg); + const std::string* const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + ++files_scheduled_to_delete; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.enable_blob_files = true; + options.blob_file_size = 32; // create one blob per file + DestroyAndReopen(options); + Random rnd(301); + + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put("Key_" + std::to_string(i), "Value_" + std::to_string(i))); + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // Verify that we are tracking all sst and blob files in dbname_ + std::unordered_map files_in_db; + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db)); + ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db)); + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + } + + std::vector blob_files = GetBlobFileNumbers(); + ASSERT_EQ(files_added, blob_files.size()); + // No blob file is obsoleted. + ASSERT_EQ(files_deleted, 0); + ASSERT_EQ(files_scheduled_to_delete, 0); + // No files were moved. + ASSERT_EQ(files_moved, 0); + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + std::unordered_map files_in_db; + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db)); + ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db)); + + // Verify that we are tracking all sst and blob files in dbname_ + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + // Verify the total files size + uint64_t total_files_size = 0; + for (auto& file_to_size : files_in_db) { + total_files_size += file_to_size.second; + } + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + Close(); + + Reopen(options); + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + + // Verify that we track all the files again after the DB is closed and opened. + Close(); + + sst_file_manager.reset(NewSstFileManager(env_)); + options.sst_file_manager = sst_file_manager; + sfm = static_cast(sst_file_manager.get()); + + Reopen(options); + + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + + // Destroy DB and it will remove all the blob files from sst file manager and + // blob files deletion will go through ScheduleFileDeletion. + ASSERT_EQ(files_deleted, 0); + ASSERT_EQ(files_scheduled_to_delete, 0); + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + ASSERT_EQ(files_deleted, blob_files.size()); + ASSERT_EQ(files_scheduled_to_delete, blob_files.size()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + +TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFilesWithGC) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.enable_blob_files = true; + options.blob_file_size = 32; // create one blob per file + options.disable_auto_compactions = true; + options.enable_blob_garbage_collection = true; + options.blob_garbage_collection_age_cutoff = 0.5; + + int files_added = 0; + int files_deleted = 0; + int files_moved = 0; + int files_scheduled_to_delete = 0; + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnAddFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + files_added++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnDeleteFile", [&](void* arg) { + const std::string* const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + files_deleted++; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", [&](void* arg) { + assert(arg); + const std::string* const file_path = + static_cast(arg); + if (file_path->find(".blob") != std::string::npos) { + ++files_scheduled_to_delete; + } + }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + Random rnd(301); + + constexpr char first_key[] = "first_key"; + constexpr char first_value[] = "first_value"; + constexpr char second_key[] = "second_key"; + constexpr char second_value[] = "second_value"; + + ASSERT_OK(Put(first_key, first_value)); + ASSERT_OK(Put(second_key, second_value)); + ASSERT_OK(Flush()); + + constexpr char third_key[] = "third_key"; + constexpr char third_value[] = "third_value"; + constexpr char fourth_key[] = "fourth_key"; + constexpr char fourth_value[] = "fourth_value"; + constexpr char fifth_key[] = "fifth_key"; + constexpr char fifth_value[] = "fifth_value"; + + ASSERT_OK(Put(third_key, third_value)); + ASSERT_OK(Put(fourth_key, fourth_value)); + ASSERT_OK(Put(fifth_key, fifth_value)); + ASSERT_OK(Flush()); + + const std::vector original_blob_files = GetBlobFileNumbers(); + + ASSERT_EQ(original_blob_files.size(), 5); + ASSERT_EQ(files_added, 5); + ASSERT_EQ(files_deleted, 0); + ASSERT_EQ(files_scheduled_to_delete, 0); + ASSERT_EQ(files_moved, 0); + { + // Verify that we are tracking all sst and blob files in dbname_ + std::unordered_map files_in_db; + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db)); + ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db)); + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + } + + const size_t cutoff_index = static_cast( + options.blob_garbage_collection_age_cutoff * original_blob_files.size()); + + size_t expected_number_of_files = original_blob_files.size(); + // Note: turning off enable_blob_files before the compaction results in + // garbage collected values getting inlined. + ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}})); + expected_number_of_files -= cutoff_index; + files_added = 0; + + constexpr Slice* begin = nullptr; + constexpr Slice* end = nullptr; + + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end)); + + ASSERT_EQ(Get(first_key), first_value); + ASSERT_EQ(Get(second_key), second_value); + ASSERT_EQ(Get(third_key), third_value); + ASSERT_EQ(Get(fourth_key), fourth_value); + ASSERT_EQ(Get(fifth_key), fifth_value); + + const std::vector new_blob_files = GetBlobFileNumbers(); + + ASSERT_EQ(new_blob_files.size(), expected_number_of_files); + // No new file is added. + ASSERT_EQ(files_added, 0); + ASSERT_EQ(files_deleted, cutoff_index); + ASSERT_EQ(files_scheduled_to_delete, cutoff_index); + ASSERT_EQ(files_moved, 0); + + // Original blob files below the cutoff should be gone, original blob files at + // or above the cutoff should be still there + for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) { + ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]); + } + + { + // Verify that we are tracking all sst and blob files in dbname_ + std::unordered_map files_in_db; + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db)); + ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db)); + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + } + + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + ASSERT_EQ(files_deleted, 5); + ASSERT_EQ(files_scheduled_to_delete, 5); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); +} + TEST_F(DBSSTTest, RateLimitedDelete) { Destroy(last_options_); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({ @@ -780,7 +1034,7 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) { uint64_t first_file_size = 0; std::unordered_map files_in_db; - ASSERT_OK(GetAllSSTFiles(&files_in_db, &first_file_size)); + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &first_file_size)); ASSERT_EQ(sfm->GetTotalSize(), first_file_size); // Set the maximum allowed space usage to the current total size @@ -791,6 +1045,60 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) { ASSERT_NOK(Flush()); } +TEST_F(DBSSTTest, DBWithMaxSpaceAllowedWithBlobFiles) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + options.disable_auto_compactions = true; + options.enable_blob_files = true; + DestroyAndReopen(options); + + Random rnd(301); + + // Generate a file containing keys. + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put(Key(i), rnd.RandomString(50))); + } + ASSERT_OK(Flush()); + + uint64_t files_size = 0; + uint64_t total_files_size = 0; + std::unordered_map files_in_db; + + ASSERT_OK(GetAllDataFiles(kBlobFile, &files_in_db, &files_size)); + // Make sure blob files are considered by SSTFileManage in size limits. + ASSERT_GT(files_size, 0); + total_files_size = files_size; + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &files_size)); + total_files_size += files_size; + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + + // Set the maximum allowed space usage to the current total size. + sfm->SetMaxAllowedSpaceUsage(files_size + 1); + + bool max_allowed_space_reached = false; + bool delete_blob_file = false; + // Sync point called after blob file is closed and max allowed space is + // checked. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BlobFileCompletionCallback::CallBack::MaxAllowedSpaceReached", + [&](void* /*arg*/) { max_allowed_space_reached = true; }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BuildTable::AfterDeleteFile", + [&](void* /*arg*/) { delete_blob_file = true; }); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put("key1", "val1")); + // This flush will fail + ASSERT_NOK(Flush()); + ASSERT_TRUE(max_allowed_space_reached); + ASSERT_TRUE(delete_blob_file); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBSSTTest, CancellingCompactionsWorks) { std::shared_ptr sst_file_manager(NewSstFileManager(env_)); auto sfm = static_cast(sst_file_manager.get()); @@ -821,7 +1129,7 @@ TEST_F(DBSSTTest, CancellingCompactionsWorks) { ASSERT_OK(Flush()); uint64_t total_file_size = 0; std::unordered_map files_in_db; - ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size)); + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size)); // Set the maximum allowed space usage to the current total size sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1); @@ -869,7 +1177,7 @@ TEST_F(DBSSTTest, CancellingManualCompactionsWorks) { ASSERT_OK(Flush()); uint64_t total_file_size = 0; std::unordered_map files_in_db; - ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size)); + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_file_size)); // Set the maximum allowed space usage to the current total size sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1); @@ -986,7 +1294,7 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) { ASSERT_TRUE(bg_error_set); uint64_t total_sst_files_size = 0; std::unordered_map files_in_db; - ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_sst_files_size)); + ASSERT_OK(GetAllDataFiles(kTableFile, &files_in_db, &total_sst_files_size)); ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 40a31b66d5..bdc0568ee5 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1127,8 +1127,34 @@ std::string DBTestBase::FilesPerLevel(int cf) { result.resize(last_non_zero_offset); return result; } + #endif // !ROCKSDB_LITE +std::vector DBTestBase::GetBlobFileNumbers() { + VersionSet* const versions = dbfull()->TEST_GetVersionSet(); + assert(versions); + + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + assert(cfd); + + Version* const current = cfd->current(); + assert(current); + + const VersionStorageInfo* const storage_info = current->storage_info(); + assert(storage_info); + + const auto& blob_files = storage_info->GetBlobFiles(); + + std::vector result; + result.reserve(blob_files.size()); + + for (const auto& blob_file : blob_files) { + result.emplace_back(blob_file.first); + } + + return result; +} + size_t DBTestBase::CountFiles() { size_t count = 0; std::vector files; @@ -1437,26 +1463,26 @@ void DBTestBase::CopyFile(const std::string& source, ASSERT_OK(destfile->Close()); } -Status DBTestBase::GetAllSSTFiles( - std::unordered_map* sst_files, +Status DBTestBase::GetAllDataFiles( + const FileType file_type, std::unordered_map* files, uint64_t* total_size /* = nullptr */) { if (total_size) { *total_size = 0; } - std::vector files; - Status s = env_->GetChildren(dbname_, &files); + std::vector children; + Status s = env_->GetChildren(dbname_, &children); if (s.ok()) { - for (auto& file_name : files) { + for (auto& file_name : children) { uint64_t number; FileType type; - if (ParseFileName(file_name, &number, &type) && type == kTableFile) { + if (ParseFileName(file_name, &number, &type) && type == file_type) { std::string file_path = dbname_ + "/" + file_name; uint64_t file_size = 0; s = env_->GetFileSize(file_path, &file_size); if (!s.ok()) { break; } - (*sst_files)[file_path] = file_size; + (*files)[file_path] = file_size; if (total_size) { *total_size += file_size; } diff --git a/db/db_test_util.h b/db/db_test_util.h index 7bee90be11..98955866fd 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1064,6 +1064,8 @@ class DBTestBase : public testing::Test { int TotalTableFiles(int cf = 0, int levels = -1); #endif // ROCKSDB_LITE + std::vector GetBlobFileNumbers(); + // Return spread of files per level std::string FilesPerLevel(int cf = 0); @@ -1153,8 +1155,9 @@ class DBTestBase : public testing::Test { void CopyFile(const std::string& source, const std::string& destination, uint64_t size = 0); - Status GetAllSSTFiles(std::unordered_map* sst_files, - uint64_t* total_size = nullptr); + Status GetAllDataFiles(const FileType file_type, + std::unordered_map* sst_files, + uint64_t* total_size = nullptr); std::vector ListTableFiles(Env* env, const std::string& path); diff --git a/db/flush_job.cc b/db/flush_job.cc index b73dc8ce0c..4f137b9844 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -80,24 +80,22 @@ const char* GetFlushReasonString (FlushReason flush_reason) { } } -FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, - const ImmutableDBOptions& db_options, - const MutableCFOptions& mutable_cf_options, - uint64_t max_memtable_id, const FileOptions& file_options, - VersionSet* versions, InstrumentedMutex* db_mutex, - std::atomic* shutting_down, - std::vector existing_snapshots, - SequenceNumber earliest_write_conflict_snapshot, - SnapshotChecker* snapshot_checker, JobContext* job_context, - LogBuffer* log_buffer, FSDirectory* db_directory, - FSDirectory* output_file_directory, - CompressionType output_compression, Statistics* stats, - EventLogger* event_logger, bool measure_io_stats, - const bool sync_output_directory, const bool write_manifest, - Env::Priority thread_pri, - const std::shared_ptr& io_tracer, - const std::string& db_id, const std::string& db_session_id, - std::string full_history_ts_low) +FlushJob::FlushJob( + const std::string& dbname, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, + const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id, + const FileOptions& file_options, VersionSet* versions, + InstrumentedMutex* db_mutex, std::atomic* shutting_down, + std::vector existing_snapshots, + SequenceNumber earliest_write_conflict_snapshot, + SnapshotChecker* snapshot_checker, JobContext* job_context, + LogBuffer* log_buffer, FSDirectory* db_directory, + FSDirectory* output_file_directory, CompressionType output_compression, + Statistics* stats, EventLogger* event_logger, bool measure_io_stats, + const bool sync_output_directory, const bool write_manifest, + Env::Priority thread_pri, const std::shared_ptr& io_tracer, + const std::string& db_id, const std::string& db_session_id, + std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback) : dbname_(dbname), db_id_(db_id), db_session_id_(db_session_id), @@ -128,7 +126,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, thread_pri_(thread_pri), io_tracer_(io_tracer), clock_(db_options_.clock), - full_history_ts_low_(std::move(full_history_ts_low)) { + full_history_ts_low_(std::move(full_history_ts_low)), + blob_callback_(blob_callback) { // Update the thread status to indicate flush. ReportStartedFlush(); TEST_SYNC_POINT("FlushJob::FlushJob()"); @@ -418,7 +417,7 @@ Status FlushJob::WriteLevel0Table() { TableFileCreationReason::kFlush, &io_s, io_tracer_, event_logger_, job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */, creation_time, oldest_key_time, write_hint, current_time, db_id_, - db_session_id_, full_history_ts_low); + db_session_id_, full_history_ts_low, blob_callback_); if (!io_s.ok()) { io_status_ = io_s; } diff --git a/db/flush_job.h b/db/flush_job.h index 78b5c76132..ff2ad85bca 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -17,6 +17,7 @@ #include #include +#include "db/blob/blob_file_completion_callback.h" #include "db/column_family.h" #include "db/dbformat.h" #include "db/flush_scheduler.h" @@ -73,7 +74,8 @@ class FlushJob { const bool sync_output_directory, const bool write_manifest, Env::Priority thread_pri, const std::shared_ptr& io_tracer, const std::string& db_id = "", const std::string& db_session_id = "", - std::string full_history_ts_low = ""); + std::string full_history_ts_low = "", + BlobFileCompletionCallback* blob_callback = nullptr); ~FlushJob(); @@ -165,6 +167,7 @@ class FlushJob { SystemClock* clock_; const std::string full_history_ts_low_; + BlobFileCompletionCallback* blob_callback_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/repair_test.cc b/db/repair_test.cc index 85337cbfd8..9ea2d9460e 100644 --- a/db/repair_test.cc +++ b/db/repair_test.cc @@ -184,7 +184,7 @@ TEST_F(RepairTest, UnflushedSst) { { uint64_t total_ssts_size; std::unordered_map sst_files; - ASSERT_OK(GetAllSSTFiles(&sst_files, &total_ssts_size)); + ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size)); ASSERT_EQ(total_ssts_size, 0); } // Need to get path before Close() deletes db_, but delete it after Close() to @@ -203,7 +203,7 @@ TEST_F(RepairTest, UnflushedSst) { { uint64_t total_ssts_size; std::unordered_map sst_files; - ASSERT_OK(GetAllSSTFiles(&sst_files, &total_ssts_size)); + ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size)); ASSERT_GT(total_ssts_size, 0); } ASSERT_EQ(Get("key"), "val"); @@ -221,7 +221,7 @@ TEST_F(RepairTest, SeparateWalDir) { { uint64_t total_ssts_size; std::unordered_map sst_files; - ASSERT_OK(GetAllSSTFiles(&sst_files, &total_ssts_size)); + ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size)); ASSERT_EQ(total_ssts_size, 0); } std::string manifest_path = @@ -241,7 +241,7 @@ TEST_F(RepairTest, SeparateWalDir) { { uint64_t total_ssts_size; std::unordered_map sst_files; - ASSERT_OK(GetAllSSTFiles(&sst_files, &total_ssts_size)); + ASSERT_OK(GetAllDataFiles(kTableFile, &sst_files, &total_ssts_size)); ASSERT_GT(total_ssts_size, 0); } ASSERT_EQ(Get("key"), "val"); diff --git a/file/delete_scheduler_test.cc b/file/delete_scheduler_test.cc index 4981336f5e..e6f590a526 100644 --- a/file/delete_scheduler_test.cc +++ b/file/delete_scheduler_test.cc @@ -87,7 +87,7 @@ class DeleteSchedulerTest : public testing::Test { std::string data(size, 'A'); EXPECT_OK(f->Append(data)); EXPECT_OK(f->Close()); - sst_file_mgr_->OnAddFile(file_path, false); + sst_file_mgr_->OnAddFile(file_path); return file_path; } diff --git a/file/sst_file_manager_impl.cc b/file/sst_file_manager_impl.cc index df037842ba..cc03e54441 100644 --- a/file/sst_file_manager_impl.cc +++ b/file/sst_file_manager_impl.cc @@ -27,7 +27,6 @@ SstFileManagerImpl::SstFileManagerImpl( fs_(fs), logger_(logger), total_files_size_(0), - in_progress_files_size_(0), compaction_buffer_size_(0), cur_compactions_reserved_size_(0), max_allowed_space_(0), @@ -60,23 +59,24 @@ void SstFileManagerImpl::Close() { } } -Status SstFileManagerImpl::OnAddFile(const std::string& file_path, - bool compaction) { +Status SstFileManagerImpl::OnAddFile(const std::string& file_path) { uint64_t file_size; Status s = fs_->GetFileSize(file_path, IOOptions(), &file_size, nullptr); if (s.ok()) { MutexLock l(&mu_); - OnAddFileImpl(file_path, file_size, compaction); + OnAddFileImpl(file_path, file_size); } - TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile"); + TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnAddFile", + const_cast(&file_path)); return s; } Status SstFileManagerImpl::OnAddFile(const std::string& file_path, - uint64_t file_size, bool compaction) { + uint64_t file_size) { MutexLock l(&mu_); - OnAddFileImpl(file_path, file_size, compaction); - TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile"); + OnAddFileImpl(file_path, file_size); + TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnAddFile", + const_cast(&file_path)); return Status::OK(); } @@ -85,7 +85,8 @@ Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) { MutexLock l(&mu_); OnDeleteFileImpl(file_path); } - TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile"); + TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnDeleteFile", + const_cast(&file_path)); return Status::OK(); } @@ -99,19 +100,6 @@ void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) { } } cur_compactions_reserved_size_ -= size_added_by_compaction; - - auto new_files = c->edit()->GetNewFiles(); - for (auto& new_file : new_files) { - auto fn = TableFileName(c->immutable_cf_options()->cf_paths, - new_file.second.fd.GetNumber(), - new_file.second.fd.GetPathId()); - if (in_progress_files_.find(fn) != in_progress_files_.end()) { - auto tracked_file = tracked_files_.find(fn); - assert(tracked_file != tracked_files_.end()); - in_progress_files_size_ -= tracked_file->second; - in_progress_files_.erase(fn); - } - } } Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, @@ -122,7 +110,7 @@ Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, if (file_size != nullptr) { *file_size = tracked_files_[old_path]; } - OnAddFileImpl(new_path, tracked_files_[old_path], false); + OnAddFileImpl(new_path, tracked_files_[old_path]); OnDeleteFileImpl(old_path); } TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile"); @@ -199,7 +187,6 @@ bool SstFileManagerImpl::EnoughRoomForCompaction( if (compaction_buffer_size_ == 0) { needed_headroom += reserved_disk_buffer_; } - needed_headroom -= in_progress_files_size_; if (free_space < needed_headroom + size_added_by_compaction) { // We hit the condition of not enough disk space ROCKS_LOG_ERROR(logger_, @@ -440,24 +427,15 @@ void SstFileManagerImpl::WaitForEmptyTrash() { } void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path, - uint64_t file_size, bool compaction) { + uint64_t file_size) { auto tracked_file = tracked_files_.find(file_path); if (tracked_file != tracked_files_.end()) { // File was added before, we will just update the size - assert(!compaction); total_files_size_ -= tracked_file->second; total_files_size_ += file_size; cur_compactions_reserved_size_ -= file_size; } else { total_files_size_ += file_size; - if (compaction) { - // Keep track of the size of files created by in-progress compactions. - // When calculating whether there's enough headroom for new compactions, - // this will be subtracted from cur_compactions_reserved_size_. - // Otherwise, compactions will be double counted. - in_progress_files_size_ += file_size; - in_progress_files_.insert(file_path); - } } tracked_files_[file_path] = file_size; } @@ -466,16 +444,10 @@ void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) { auto tracked_file = tracked_files_.find(file_path); if (tracked_file == tracked_files_.end()) { // File is not tracked - assert(in_progress_files_.find(file_path) == in_progress_files_.end()); return; } total_files_size_ -= tracked_file->second; - // Check if it belonged to an in-progress compaction - if (in_progress_files_.find(file_path) != in_progress_files_.end()) { - in_progress_files_size_ -= tracked_file->second; - in_progress_files_.erase(file_path); - } tracked_files_.erase(tracked_file); } diff --git a/file/sst_file_manager_impl.h b/file/sst_file_manager_impl.h index 52ef1917e5..d6db3d494f 100644 --- a/file/sst_file_manager_impl.h +++ b/file/sst_file_manager_impl.h @@ -21,9 +21,8 @@ class FileSystem; class SystemClock; class Logger; -// SstFileManager is used to track SST files in the DB and control their -// deletion rate. -// All SstFileManager public functions are thread-safe. +// SstFileManager is used to track SST and blob files in the DB and control +// their deletion rate. All SstFileManager public functions are thread-safe. class SstFileManagerImpl : public SstFileManager { public: explicit SstFileManagerImpl(const std::shared_ptr& clock, @@ -35,24 +34,23 @@ class SstFileManagerImpl : public SstFileManager { ~SstFileManagerImpl(); - // DB will call OnAddFile whenever a new sst file is added. - Status OnAddFile(const std::string& file_path, bool compaction = false); + // DB will call OnAddFile whenever a new sst/blob file is added. + Status OnAddFile(const std::string& file_path); // Overload where size of the file is provided by the caller rather than // queried from the filesystem. This is an optimization. - Status OnAddFile(const std::string& file_path, uint64_t file_size, - bool compaction); + Status OnAddFile(const std::string& file_path, uint64_t file_size); - // DB will call OnDeleteFile whenever an sst file is deleted. + // DB will call OnDeleteFile whenever a sst/blob file is deleted. Status OnDeleteFile(const std::string& file_path); - // DB will call OnMoveFile whenever an sst file is move to a new path. + // DB will call OnMoveFile whenever a sst/blob file is move to a new path. Status OnMoveFile(const std::string& old_path, const std::string& new_path, uint64_t* file_size = nullptr); // Update the maximum allowed space that should be used by RocksDB, if - // the total size of the SST files exceeds max_allowed_space, writes to - // RocksDB will fail. + // the total size of the SST and blob files exceeds max_allowed_space, writes + // to RocksDB will fail. // // Setting max_allowed_space to 0 will disable this feature, maximum allowed // space will be infinite (Default value). @@ -62,8 +60,8 @@ class SstFileManagerImpl : public SstFileManager { void SetCompactionBufferSize(uint64_t compaction_buffer_size) override; - // Return true if the total size of SST files exceeded the maximum allowed - // space usage. + // Return true if the total size of SST and blob files exceeded the maximum + // allowed space usage. // // thread-safe. bool IsMaxAllowedSpaceReached() override; @@ -142,8 +140,7 @@ class SstFileManagerImpl : public SstFileManager { private: // REQUIRES: mutex locked - void OnAddFileImpl(const std::string& file_path, uint64_t file_size, - bool compaction); + void OnAddFileImpl(const std::string& file_path, uint64_t file_size); // REQUIRES: mutex locked void OnDeleteFileImpl(const std::string& file_path); @@ -159,8 +156,6 @@ class SstFileManagerImpl : public SstFileManager { port::Mutex mu_; // The summation of the sizes of all files in tracked_files_ map uint64_t total_files_size_; - // The summation of all output files of in-progress compactions - uint64_t in_progress_files_size_; // Compactions should only execute if they can leave at least // this amount of buffer space for logs and flushes uint64_t compaction_buffer_size_; @@ -169,9 +164,7 @@ class SstFileManagerImpl : public SstFileManager { // A map containing all tracked files and there sizes // file_path => file_size std::unordered_map tracked_files_; - // A set of files belonging to in-progress compactions - std::unordered_set in_progress_files_; - // The maximum allowed space (in bytes) for sst files. + // The maximum allowed space (in bytes) for sst and blob files. uint64_t max_allowed_space_; // DeleteScheduler used to throttle file deletition. DeleteScheduler delete_scheduler_; @@ -191,7 +184,7 @@ class SstFileManagerImpl : public SstFileManager { // compactions to run full throttle. If disk space is below this trigger, // compactions will be gated by free disk space > input size uint64_t free_space_trigger_; - // List of database error handler instances tracked by this sst file manager + // List of database error handler instances tracked by this SstFileManager. std::list error_handler_list_; // Pointer to ErrorHandler instance that is currently processing recovery ErrorHandler* cur_instance_; diff --git a/include/rocksdb/sst_file_manager.h b/include/rocksdb/sst_file_manager.h index 350dec7a8b..5aae88dc1e 100644 --- a/include/rocksdb/sst_file_manager.h +++ b/include/rocksdb/sst_file_manager.h @@ -19,17 +19,16 @@ namespace ROCKSDB_NAMESPACE { class Env; class Logger; -// SstFileManager is used to track SST files in the DB and control their -// deletion rate. -// All SstFileManager public functions are thread-safe. +// SstFileManager is used to track SST and blob files in the DB and control +// their deletion rate. All SstFileManager public functions are thread-safe. // SstFileManager is not extensible. class SstFileManager { public: virtual ~SstFileManager() {} // Update the maximum allowed space that should be used by RocksDB, if - // the total size of the SST files exceeds max_allowed_space, writes to - // RocksDB will fail. + // the total size of the SST and blob files exceeds max_allowed_space, writes + // to RocksDB will fail. // // Setting max_allowed_space to 0 will disable this feature; maximum allowed // space will be infinite (Default value). @@ -43,14 +42,14 @@ class SstFileManager { // other background functions may continue, such as logging and flushing. virtual void SetCompactionBufferSize(uint64_t compaction_buffer_size) = 0; - // Return true if the total size of SST files exceeded the maximum allowed - // space usage. + // Return true if the total size of SST and blob files exceeded the maximum + // allowed space usage. // // thread-safe. virtual bool IsMaxAllowedSpaceReached() = 0; - // Returns true if the total size of SST files as well as estimated size - // of ongoing compactions exceeds the maximums allowed space usage. + // Returns true if the total size of SST and blob files as well as estimated + // size of ongoing compactions exceeds the maximums allowed space usage. virtual bool IsMaxAllowedSpaceReachedIncludingCompactions() = 0; // Return the total size of all tracked files. @@ -87,7 +86,7 @@ class SstFileManager { }; // Create a new SstFileManager that can be shared among multiple RocksDB -// instances to track SST file and control there deletion rate. +// instances to track SST and blob files and control there deletion rate. // Even though SstFileManager don't track WAL files but it still control // there deletion rate. //