diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc index 1fd9261417..f763ced20c 100644 --- a/utilities/blob_db/blob_db.cc +++ b/utilities/blob_db/blob_db.cc @@ -57,12 +57,16 @@ Status BlobDB::OpenAndLoad(const Options& options, { MutexLock l(&listener_mutex); all_blobdb_listeners.push_back(fblistener); - all_blobdb_listeners.push_back(ce_listener); + if (bdb_options.enable_garbage_collection) { + all_blobdb_listeners.push_back(ce_listener); + } all_wal_filters.push_back(rw_filter); } changed_options->listeners.emplace_back(fblistener); - changed_options->listeners.emplace_back(ce_listener); + if (bdb_options.enable_garbage_collection) { + changed_options->listeners.emplace_back(ce_listener); + } changed_options->wal_filter = rw_filter.get(); DBOptions db_options(*changed_options); @@ -71,7 +75,9 @@ Status BlobDB::OpenAndLoad(const Options& options, BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); fblistener->SetImplPtr(bdb); - ce_listener->SetImplPtr(bdb); + if (bdb_options.enable_garbage_collection) { + ce_listener->SetImplPtr(bdb); + } rw_filter->SetImplPtr(bdb); Status s = bdb->OpenPhase1(); @@ -124,20 +130,26 @@ Status BlobDB::Open(const DBOptions& db_options_input, ReconcileWalFilter_t rw_filter = std::make_shared(); db_options.listeners.emplace_back(fblistener); - db_options.listeners.emplace_back(ce_listener); + if (bdb_options.enable_garbage_collection) { + db_options.listeners.emplace_back(ce_listener); + } db_options.wal_filter = rw_filter.get(); { MutexLock l(&listener_mutex); all_blobdb_listeners.push_back(fblistener); - all_blobdb_listeners.push_back(ce_listener); + if (bdb_options.enable_garbage_collection) { + all_blobdb_listeners.push_back(ce_listener); + } all_wal_filters.push_back(rw_filter); } // we need to open blob db first so that recovery can happen BlobDBImpl* bdb = new BlobDBImpl(dbname, bdb_options, db_options); fblistener->SetImplPtr(bdb); - ce_listener->SetImplPtr(bdb); + if (bdb_options.enable_garbage_collection) { + ce_listener->SetImplPtr(bdb); + } rw_filter->SetImplPtr(bdb); s = bdb->OpenPhase1(); @@ -172,25 +184,27 @@ Status BlobDB::Open(const DBOptions& db_options_input, BlobDB::BlobDB(DB* db) : StackableDB(db) {} void BlobDBOptions::Dump(Logger* log) const { - ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s", + ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s", blob_dir.c_str()); - ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d", + ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d", path_relative); - ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d", + ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d", is_fifo); - ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64, + ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64, blob_dir_size); - ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32, + ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32, ttl_range_secs); - ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64, + ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64, bytes_per_sync); - ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64, + ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64, blob_file_size); - ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p", + ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p", ttl_extractor.get()); - ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d", + ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d", static_cast(compression)); - ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d", + ROCKS_LOG_HEADER(log, "blob_db_options.enable_garbage_collection: %d", + enable_garbage_collection); + ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d", disable_background_tasks); } diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 1ef382ab86..3ade460eb2 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -71,7 +71,12 @@ struct BlobDBOptions { // what compression to use for Blob's CompressionType compression = kNoCompression; - // Disable all background job. + // If enabled, blob DB periodically cleanup stale data by rewriting remaining + // live data in blob files to new files. If garbage collection is not enabled, + // blob files will be cleanup based on TTL. + bool enable_garbage_collection = false; + + // Disable all background job. Used for test only. bool disable_background_tasks = false; void Dump(Logger* log) const; diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 17b832e37d..9aeaadbaea 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -69,6 +69,7 @@ void EvictAllVersionsCompactionListener::InternalListener::OnCompaction( int level, const Slice& key, CompactionEventListener::CompactionListenerValueType value_type, const Slice& existing_value, const SequenceNumber& sn, bool is_new) { + assert(impl_->bdb_options_.enable_garbage_collection); if (!is_new && value_type == CompactionEventListener::CompactionListenerValueType::kValue) { @@ -213,12 +214,14 @@ void BlobDBImpl::StartBackgroundTasks() { std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1)); tqueue_.add(kGCCheckPeriodMillisecs, std::bind(&BlobDBImpl::RunGC, this, std::placeholders::_1)); - tqueue_.add( - kDeleteCheckPeriodMillisecs, - std::bind(&BlobDBImpl::EvictDeletions, this, std::placeholders::_1)); - tqueue_.add( - kDeleteCheckPeriodMillisecs, - std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1)); + if (bdb_options_.enable_garbage_collection) { + tqueue_.add( + kDeleteCheckPeriodMillisecs, + std::bind(&BlobDBImpl::EvictDeletions, this, std::placeholders::_1)); + tqueue_.add( + kDeleteCheckPeriodMillisecs, + std::bind(&BlobDBImpl::EvictCompacted, this, std::placeholders::_1)); + } tqueue_.add( kDeleteObsoleteFilesPeriodMillisecs, std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1)); @@ -659,8 +662,10 @@ Status BlobDBImpl::Delete(const WriteOptions& options, const Slice& key) { SequenceNumber lsn = db_impl_->GetLatestSequenceNumber(); Status s = db_->Delete(options, key); - // add deleted key to list of keys that have been deleted for book-keeping - delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn}); + if (bdb_options_.enable_garbage_collection) { + // add deleted key to list of keys that have been deleted for book-keeping + delete_keys_q_.enqueue({DefaultColumnFamily(), key.ToString(), lsn}); + } return s; } @@ -780,11 +785,13 @@ Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { SequenceNumber sequence_; }; - // add deleted key to list of keys that have been deleted for book-keeping - DeleteBookkeeper delete_bookkeeper(this, current_seq); - updates->Iterate(&delete_bookkeeper); + if (bdb_options_.enable_garbage_collection) { + // add deleted key to list of keys that have been deleted for book-keeping + DeleteBookkeeper delete_bookkeeper(this, current_seq); + s = updates->Iterate(&delete_bookkeeper); + } - return Status::OK(); + return s; } Status BlobDBImpl::GetLiveFiles(std::vector& ret, @@ -1318,6 +1325,7 @@ bool BlobDBImpl::FileDeleteOk_SnapshotCheckLocked( bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, uint64_t blob_offset, uint64_t blob_size) { + assert(bdb_options_.enable_garbage_collection); (void)blob_offset; std::shared_ptr bfile; { @@ -1340,6 +1348,7 @@ bool BlobDBImpl::FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size, } bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) { + assert(bdb_options_.enable_garbage_collection); BlobIndex blob_index; Status s = blob_index.DecodeFrom(index_entry); if (!s.ok()) { @@ -1354,6 +1363,7 @@ bool BlobDBImpl::MarkBlobDeleted(const Slice& key, const Slice& index_entry) { } std::pair BlobDBImpl::EvictCompacted(bool aborted) { + assert(bdb_options_.enable_garbage_collection); if (aborted) return std::make_pair(false, -1); override_packet_t packet; @@ -1377,6 +1387,7 @@ std::pair BlobDBImpl::EvictCompacted(bool aborted) { } std::pair BlobDBImpl::EvictDeletions(bool aborted) { + assert(bdb_options_.enable_garbage_collection); if (aborted) return std::make_pair(false, -1); ColumnFamilyHandle* last_cfh = nullptr; @@ -1882,10 +1893,12 @@ bool BlobDBImpl::ShouldGCFile(std::shared_ptr bfile, uint64_t now, ReadLock lockbfile_r(&bfile->mutex_); - if ((bfile->deleted_size_ * 100.0 / bfile->file_size_.load()) > - kPartialExpirationPercentage) { - *reason = "deleted simple blobs beyond threshold"; - return true; + if (bdb_options_.enable_garbage_collection) { + if ((bfile->deleted_size_ * 100.0 / bfile->file_size_.load()) > + kPartialExpirationPercentage) { + *reason = "deleted simple blobs beyond threshold"; + return true; + } } // if we haven't reached limits of disk space, don't DELETE