diff --git a/CMakeLists.txt b/CMakeLists.txt index 5a9bd53180..739da75b4d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -575,6 +575,7 @@ set(SOURCES util/transaction_test_util.cc util/xxhash.cc utilities/backupable/backupable_db.cc + utilities/blob_db/blob_compaction_filter.cc utilities/blob_db/blob_db.cc utilities/blob_db/blob_db_impl.cc utilities/blob_db/blob_dump_tool.cc diff --git a/TARGETS b/TARGETS index ca0ab48d0f..b7dae2caa7 100644 --- a/TARGETS +++ b/TARGETS @@ -217,6 +217,7 @@ cpp_library( "util/transaction_test_util.cc", "util/xxhash.cc", "utilities/backupable/backupable_db.cc", + "utilities/blob_db/blob_compaction_filter.cc", "utilities/blob_db/blob_db.cc", "utilities/blob_db/blob_db_impl.cc", "utilities/blob_db/blob_dump_tool.cc", diff --git a/db/compaction_iterator.cc b/db/compaction_iterator.cc index 45d786f0d4..3d62f43a4f 100644 --- a/db/compaction_iterator.cc +++ b/db/compaction_iterator.cc @@ -167,10 +167,13 @@ void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, CompactionFilter::ValueType value_type = ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue : CompactionFilter::ValueType::kBlobIndex; + // Hack: pass internal key to BlobIndexCompactionFilter since it needs + // to get sequence number. + Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_; { StopWatchNano timer(env_, true); filter = compaction_filter_->FilterV2( - compaction_->level(), ikey_.user_key, value_type, value_, + compaction_->level(), filter_key, value_type, value_, &compaction_filter_value_, compaction_filter_skip_until_.rep()); iter_stats_.total_filter_time += env_ != nullptr ? timer.ElapsedNanos() : 0; diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 57c8daf338..8cf8b7d3e6 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -265,7 +265,16 @@ enum Tickers : uint32_t { BLOB_DB_BLOB_FILE_SYNCED, // # of blob index evicted from base DB by BlobDB compaction filter because // of expiration. - BLOB_DB_BLOB_INDEX_EXPIRED, + BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, + // size of blob index evicted from base DB by BlobDB compaction filter + // because of expiration. + BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, + // # of blob index evicted from base DB by BlobDB compaction filter because + // of corresponding file deleted. + BLOB_DB_BLOB_INDEX_EVICTED_COUNT, + // size of blob index evicted from base DB by BlobDB compaction filter + // because of corresponding file deleted. + BLOB_DB_BLOB_INDEX_EVICTED_SIZE, // # of blob files being garbage collected. BLOB_DB_GC_NUM_FILES, // # of blob files generated by garbage collection. @@ -417,7 +426,12 @@ const std::vector> TickersNameMap = { {BLOB_DB_BLOB_FILE_BYTES_WRITTEN, "rocksdb.blobdb.blob.file.bytes.written"}, {BLOB_DB_BLOB_FILE_BYTES_READ, "rocksdb.blobdb.blob.file.bytes.read"}, {BLOB_DB_BLOB_FILE_SYNCED, "rocksdb.blobdb.blob.file.synced"}, - {BLOB_DB_BLOB_INDEX_EXPIRED, "rocksdb.blobdb.blob.index.expired"}, + {BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, + "rocksdb.blobdb.blob.index.expired.count"}, + {BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, "rocksdb.blobdb.blob.index.expired.size"}, + {BLOB_DB_BLOB_INDEX_EVICTED_COUNT, + "rocksdb.blobdb.blob.index.evicted.count"}, + {BLOB_DB_BLOB_INDEX_EVICTED_SIZE, "rocksdb.blobdb.blob.index.evicted.size"}, {BLOB_DB_GC_NUM_FILES, "rocksdb.blobdb.gc.num.files"}, {BLOB_DB_GC_NUM_NEW_FILES, "rocksdb.blobdb.gc.num.new.files"}, {BLOB_DB_GC_FAILURES, "rocksdb.blobdb.gc.failures"}, diff --git a/src.mk b/src.mk index c2ad87c54a..a0fcd80f8e 100644 --- a/src.mk +++ b/src.mk @@ -153,6 +153,7 @@ LIB_SOURCES = \ util/transaction_test_util.cc \ util/xxhash.cc \ utilities/backupable/backupable_db.cc \ + utilities/blob_db/blob_compaction_filter.cc \ utilities/blob_db/blob_db.cc \ utilities/blob_db/blob_db_impl.cc \ utilities/blob_db/blob_file.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 40e033167a..855e0764f5 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -686,7 +686,7 @@ DEFINE_bool(blob_db_enable_gc, false, "Enable BlobDB garbage collection."); DEFINE_bool(blob_db_is_fifo, false, "Enable FIFO eviction strategy in BlobDB."); -DEFINE_uint64(blob_db_dir_size, 0, +DEFINE_uint64(blob_db_max_db_size, 0, "Max size limit of the directory where blob files are stored."); DEFINE_uint64(blob_db_max_ttl_range, 86400, @@ -3446,7 +3446,7 @@ void VerifyDBFromDB(std::string& truth_db_name) { blob_db::BlobDBOptions blob_db_options; blob_db_options.enable_garbage_collection = FLAGS_blob_db_enable_gc; blob_db_options.is_fifo = FLAGS_blob_db_is_fifo; - blob_db_options.blob_dir_size = FLAGS_blob_db_dir_size; + blob_db_options.max_db_size = FLAGS_blob_db_max_db_size; blob_db_options.ttl_range_secs = FLAGS_blob_db_ttl_range_secs; blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size; blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync; diff --git a/utilities/blob_db/blob_compaction_filter.cc b/utilities/blob_db/blob_compaction_filter.cc new file mode 100644 index 0000000000..cbc76a98dd --- /dev/null +++ b/utilities/blob_db/blob_compaction_filter.cc @@ -0,0 +1,117 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_compaction_filter.h" +#include "db/dbformat.h" + +namespace rocksdb { +namespace blob_db { + +namespace { + +// CompactionFilter to delete expired blob index from base DB. +class BlobIndexCompactionFilter : public CompactionFilter { + public: + BlobIndexCompactionFilter(BlobCompactionContext context, + uint64_t current_time, Statistics* statistics) + : context_(context), + current_time_(current_time), + statistics_(statistics) {} + + virtual ~BlobIndexCompactionFilter() { + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); + } + + virtual const char* Name() const override { + return "BlobIndexCompactionFilter"; + } + + // Filter expired blob indexes regardless of snapshots. + virtual bool IgnoreSnapshots() const override { return true; } + + virtual Decision FilterV2(int /*level*/, const Slice& key, + ValueType value_type, const Slice& value, + std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (value_type != kBlobIndex) { + return Decision::kKeep; + } + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + // Unable to decode blob index. Keeping the value. + return Decision::kKeep; + } + if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { + // Expired + expired_count_++; + expired_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (!blob_index.IsInlined() && + blob_index.file_number() < context_.next_file_number && + context_.current_blob_files.count(blob_index.file_number()) == 0) { + // Corresponding blob file gone. Could have been garbage collected or + // evicted by FIFO eviction. + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() && + blob_index.expiration() < context_.evict_expiration_up_to) { + // Hack: Internal key is passed to BlobIndexCompactionFilter for it to + // get sequence number. + ParsedInternalKey ikey; + bool ok = ParseInternalKey(key, &ikey); + // Remove keys that could have been remove by last FIFO eviction. + // If get error while parsing key, ignore and continue. + if (ok && ikey.sequence < context_.fifo_eviction_seq) { + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + } + return Decision::kKeep; + } + + private: + BlobCompactionContext context_; + const uint64_t current_time_; + Statistics* statistics_; + // It is safe to not using std::atomic since the compaction filter, created + // from a compaction filter factroy, will not be called from multiple threads. + mutable uint64_t expired_count_ = 0; + mutable uint64_t expired_size_ = 0; + mutable uint64_t evicted_count_ = 0; + mutable uint64_t evicted_size_ = 0; +}; + +} // anonymous namespace + +std::unique_ptr +BlobIndexCompactionFilterFactory::CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) { + int64_t current_time = 0; + Status s = env_->GetCurrentTime(¤t_time); + if (!s.ok()) { + return nullptr; + } + assert(current_time >= 0); + + BlobCompactionContext context; + blob_db_impl_->GetCompactionContext(&context); + + return std::unique_ptr(new BlobIndexCompactionFilter( + context, static_cast(current_time), statistics_)); +} + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/blob_db/blob_compaction_filter.h b/utilities/blob_db/blob_compaction_filter.h index 192a338ff3..7a8ea61357 100644 --- a/utilities/blob_db/blob_compaction_filter.h +++ b/utilities/blob_db/blob_compaction_filter.h @@ -5,82 +5,39 @@ #pragma once #ifndef ROCKSDB_LITE +#include + #include "monitoring/statistics.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" +#include "utilities/blob_db/blob_db_impl.h" #include "utilities/blob_db/blob_index.h" namespace rocksdb { namespace blob_db { -// CompactionFilter to delete expired blob index from base DB. -class BlobIndexCompactionFilter : public CompactionFilter { - public: - BlobIndexCompactionFilter(uint64_t current_time, Statistics* statistics) - : current_time_(current_time), statistics_(statistics) {} - - virtual ~BlobIndexCompactionFilter() { - RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED, expired_count_); - } - - virtual const char* Name() const override { - return "BlobIndexCompactionFilter"; - } - - // Filter expired blob indexes regardless of snapshots. - virtual bool IgnoreSnapshots() const override { return true; } - - virtual Decision FilterV2(int /*level*/, const Slice& /*key*/, - ValueType value_type, const Slice& value, - std::string* /*new_value*/, - std::string* /*skip_until*/) const override { - if (value_type != kBlobIndex) { - return Decision::kKeep; - } - BlobIndex blob_index; - Status s = blob_index.DecodeFrom(value); - if (!s.ok()) { - // Unable to decode blob index. Keeping the value. - return Decision::kKeep; - } - if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { - // Expired - expired_count_++; - return Decision::kRemove; - } - return Decision::kKeep; - } - - private: - const uint64_t current_time_; - Statistics* statistics_; - // It is safe to not using std::atomic since the compaction filter, created - // from a compaction filter factroy, will not be called from multiple threads. - mutable uint64_t expired_count_ = 0; +struct BlobCompactionContext { + uint64_t next_file_number; + std::unordered_set current_blob_files; + SequenceNumber fifo_eviction_seq; + uint64_t evict_expiration_up_to; }; class BlobIndexCompactionFilterFactory : public CompactionFilterFactory { public: - BlobIndexCompactionFilterFactory(Env* env, Statistics* statistics) - : env_(env), statistics_(statistics) {} + BlobIndexCompactionFilterFactory(BlobDBImpl* blob_db_impl, Env* env, + Statistics* statistics) + : blob_db_impl_(blob_db_impl), env_(env), statistics_(statistics) {} virtual const char* Name() const override { return "BlobIndexCompactionFilterFactory"; } virtual std::unique_ptr CreateCompactionFilter( - const CompactionFilter::Context& /*context*/) override { - int64_t current_time = 0; - Status s = env_->GetCurrentTime(¤t_time); - if (!s.ok()) { - return nullptr; - } - assert(current_time >= 0); - return std::unique_ptr(new BlobIndexCompactionFilter( - static_cast(current_time), statistics_)); - } + const CompactionFilter::Context& /*context*/) override; private: + BlobDBImpl* blob_db_impl_; Env* env_; Statistics* statistics_; }; diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc index f042db76e8..523324a763 100644 --- a/utilities/blob_db/blob_db.cc +++ b/utilities/blob_db/blob_db.cc @@ -63,30 +63,48 @@ Status BlobDB::Open(const DBOptions& db_options, BlobDB::BlobDB() : StackableDB(nullptr) {} void BlobDBOptions::Dump(Logger* log) const { - ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir: %s", - blob_dir.c_str()); - ROCKS_LOG_HEADER(log, " blob_db_options.path_relative: %d", - path_relative); - ROCKS_LOG_HEADER(log, " blob_db_options.is_fifo: %d", - is_fifo); - ROCKS_LOG_HEADER(log, " blob_db_options.blob_dir_size: %" PRIu64, - blob_dir_size); - ROCKS_LOG_HEADER(log, " blob_db_options.ttl_range_secs: %" PRIu32, - ttl_range_secs); - ROCKS_LOG_HEADER(log, " blob_db_options.min_blob_size: %" PRIu64, - min_blob_size); - ROCKS_LOG_HEADER(log, " blob_db_options.bytes_per_sync: %" PRIu64, - bytes_per_sync); - ROCKS_LOG_HEADER(log, " blob_db_options.blob_file_size: %" PRIu64, - blob_file_size); - ROCKS_LOG_HEADER(log, " blob_db_options.ttl_extractor: %p", - ttl_extractor.get()); - ROCKS_LOG_HEADER(log, " blob_db_options.compression: %d", - static_cast(compression)); - ROCKS_LOG_HEADER(log, "blob_db_options.enable_garbage_collection: %d", - enable_garbage_collection); - ROCKS_LOG_HEADER(log, " blob_db_options.disable_background_tasks: %d", - disable_background_tasks); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.blob_dir: %s", + blob_dir.c_str()); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.path_relative: %d", + path_relative); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.is_fifo: %d", + is_fifo); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.max_db_size: %" PRIu64, + max_db_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.ttl_range_secs: %" PRIu32, + ttl_range_secs); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.min_blob_size: %" PRIu64, + min_blob_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.bytes_per_sync: %" PRIu64, + bytes_per_sync); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.blob_file_size: %" PRIu64, + blob_file_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.ttl_extractor: %p", + ttl_extractor.get()); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.compression: %d", + static_cast(compression)); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.enable_garbage_collection: %d", + enable_garbage_collection); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.garbage_collection_interval_secs: %" PRIu64, + garbage_collection_interval_secs); + ROCKS_LOG_HEADER( + log, "BlobDBOptions.garbage_collection_deletion_size_threshold: %lf", + garbage_collection_deletion_size_threshold); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.disable_background_tasks: %d", + disable_background_tasks); } } // namespace blob_db diff --git a/utilities/blob_db/blob_db.h b/utilities/blob_db/blob_db.h index 8f73711168..183d23a8cd 100644 --- a/utilities/blob_db/blob_db.h +++ b/utilities/blob_db/blob_db.h @@ -36,13 +36,17 @@ struct BlobDBOptions { // whether the blob_dir path is relative or absolute. bool path_relative = true; - // is the eviction strategy fifo based + // When max_db_size is reached, evict blob files to free up space + // instead of returnning NoSpace error on write. Blob files will be + // evicted in this order until enough space is free up: + // * the TTL blob file cloeset to expire, + // * the oldest non-TTL blob file. bool is_fifo = false; - // maximum size of the blob dir. Once this gets used, up - // evict the blob file which is oldest (is_fifo ) - // 0 means no limits - uint64_t blob_dir_size = 0; + // Maximum size of the database (including SST files and blob files). + // + // Default: 0 (no limits) + uint64_t max_db_size = 0; // a new bucket is opened, for ttl_range. So if ttl_range is 600seconds // (10 minutes), and the first bucket starts at 1471542000 @@ -198,6 +202,9 @@ class BlobDB : public StackableDB { return NewIterator(options); } + using rocksdb::StackableDB::Close; + virtual Status Close() override = 0; + // Opening blob db. static Status Open(const Options& options, const BlobDBOptions& bdb_options, const std::string& dbname, BlobDB** blob_db); diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 7a6874c95d..1a1a223928 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -35,6 +35,7 @@ #include "util/timer_queue.h" #include "utilities/blob_db/blob_compaction_filter.h" #include "utilities/blob_db/blob_db_iterator.h" +#include "utilities/blob_db/blob_db_listener.h" #include "utilities/blob_db/blob_index.h" namespace { @@ -44,12 +45,6 @@ int kBlockBasedTableVersionFormat = 2; namespace rocksdb { namespace blob_db { -void BlobDBFlushBeginListener::OnFlushBegin(DB* /*db*/, - const FlushJobInfo& /*info*/) { - assert(blob_db_impl_ != nullptr); - blob_db_impl_->SyncBlobFiles(); -} - WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound( unsigned long long /*log_number*/, const std::string& /*log_file_name*/, const WriteBatch& /*batch*/, WriteBatch* /*new_batch*/, @@ -59,6 +54,7 @@ WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound( bool blobf_compare_ttl::operator()(const std::shared_ptr& lhs, const std::shared_ptr& rhs) const { + assert(lhs->HasTTL() && rhs->HasTTL()); if (lhs->expiration_range_.first < rhs->expiration_range_.first) { return true; } @@ -84,12 +80,13 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, statistics_(db_options_.statistics.get()), next_file_number_(1), epoch_of_(0), - shutdown_(false), + closed_(true), open_file_count_(0), - total_blob_space_(0), - open_p1_done_(false), - debug_level_(0), - oldest_file_evicted_(false) { + total_blob_size_(0), + live_sst_size_(0), + fifo_eviction_seq_(0), + evict_expiration_up_to_(0), + debug_level_(0) { blob_dir_ = (bdb_options_.path_relative) ? dbname + "/" + bdb_options_.blob_dir : bdb_options_.blob_dir; @@ -98,8 +95,30 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, BlobDBImpl::~BlobDBImpl() { // CancelAllBackgroundWork(db_, true); + Status s __attribute__((__unused__)) = Close(); + assert(s.ok()); +} - Shutdown(); +Status BlobDBImpl::Close() { + if (closed_) { + return Status::OK(); + } + closed_ = true; + + // Close base DB before BlobDBImpl destructs to stop event listener and + // compaction filter call. + Status s = db_->Close(); + // delete db_ anyway even if close failed. + delete db_; + // Reset pointers to avoid StackableDB delete the pointer again. + db_ = nullptr; + db_impl_ = nullptr; + if (!s.ok()) { + return s; + } + + s = SyncBlobFiles(); + return s; } BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } @@ -149,10 +168,9 @@ Status BlobDBImpl::Open(std::vector* handles) { } // Update options - db_options_.listeners.push_back( - std::shared_ptr(new BlobDBFlushBeginListener(this))); + db_options_.listeners.push_back(std::make_shared(this)); cf_options_.compaction_filter_factory.reset( - new BlobIndexCompactionFilterFactory(env_, statistics_)); + new BlobIndexCompactionFilterFactory(this, env_, statistics_)); // Open base db. ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_); @@ -161,6 +179,7 @@ Status BlobDBImpl::Open(std::vector* handles) { return s; } db_impl_ = static_cast_with_check(db_->GetRootDB()); + UpdateLiveSSTSize(); // Start background jobs. if (!bdb_options_.disable_background_tasks) { @@ -169,6 +188,7 @@ Status BlobDBImpl::Open(std::vector* handles) { ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this); bdb_options_.Dump(db_options_.info_log.get()); + closed_ = false; return s; } @@ -190,8 +210,6 @@ void BlobDBImpl::StartBackgroundTasks() { std::bind(&BlobDBImpl::CheckSeqFiles, this, std::placeholders::_1)); } -void BlobDBImpl::Shutdown() { shutdown_.store(true); } - Status BlobDBImpl::GetAllBlobFiles(std::set* file_numbers) { assert(file_numbers != nullptr); std::vector all_files; @@ -241,8 +259,7 @@ Status BlobDBImpl::OpenAllBlobFiles() { Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_); if (read_metadata_status.IsCorruption()) { // Remove incomplete file. - blob_file->MarkObsolete(0 /*sequence number*/); - obsolete_files_.push_back(blob_file); + ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/); if (!obsolete_file_list.empty()) { obsolete_file_list.append(", "); } @@ -256,6 +273,8 @@ Status BlobDBImpl::OpenAllBlobFiles() { return read_metadata_status; } + total_blob_size_ += blob_file->GetFileSize(); + blob_files_[file_number] = blob_file; if (!blob_file_list.empty()) { blob_file_list.append(", "); @@ -343,25 +362,33 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr& bfile) { std::shared_ptr BlobDBImpl::FindBlobFileLocked( uint64_t expiration) const { - if (open_ttl_files_.empty()) return nullptr; + if (open_ttl_files_.empty()) { + return nullptr; + } std::shared_ptr tmp = std::make_shared(); + tmp->SetHasTTL(true); tmp->expiration_range_ = std::make_pair(expiration, 0); + tmp->file_number_ = std::numeric_limits::max(); auto citr = open_ttl_files_.equal_range(tmp); if (citr.first == open_ttl_files_.end()) { assert(citr.second == open_ttl_files_.end()); std::shared_ptr check = *(open_ttl_files_.rbegin()); - return (check->expiration_range_.second < expiration) ? nullptr : check; + return (check->expiration_range_.second <= expiration) ? nullptr : check; } - if (citr.first != citr.second) return *(citr.first); + if (citr.first != citr.second) { + return *(citr.first); + } auto finditr = citr.second; - if (finditr != open_ttl_files_.begin()) --finditr; + if (finditr != open_ttl_files_.begin()) { + --finditr; + } - bool b2 = (*finditr)->expiration_range_.second < expiration; + bool b2 = (*finditr)->expiration_range_.second <= expiration; bool b1 = (*finditr)->expiration_range_.first > expiration; return (b1 || b2) ? nullptr : (*finditr); @@ -426,6 +453,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFile() { blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile)); open_non_ttl_file_ = bfile; + total_blob_size_ += BlobLogHeader::kSize; return bfile; } @@ -500,6 +528,7 @@ std::shared_ptr BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) { blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile)); open_ttl_files_.insert(bfile); + total_blob_size_ += BlobLogHeader::kSize; epoch_of_++; return bfile; @@ -663,6 +692,7 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/, const Slice& key, const Slice& value, uint64_t expiration, WriteBatch* batch) { + write_mutex_.AssertHeld(); Status s; std::string index_entry; uint32_t column_family_id = @@ -680,20 +710,27 @@ Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/, RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL); } } else { - std::shared_ptr bfile = (expiration != kNoExpiration) - ? SelectBlobFileTTL(expiration) - : SelectBlobFile(); - if (!bfile) { - return Status::NotFound("Blob file not found"); - } - - assert(bfile->compression() == bdb_options_.compression); std::string compression_output; Slice value_compressed = GetCompressedSlice(value, &compression_output); std::string headerbuf; Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration); + // Check DB size limit before selecting blob file to + // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be + // done before calling SelectBlobFile(). + s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() + + value_compressed.size()); + if (!s.ok()) { + return s; + } + + std::shared_ptr bfile = (expiration != kNoExpiration) + ? SelectBlobFileTTL(expiration) + : SelectBlobFile(); + assert(bfile != nullptr); + assert(bfile->compression() == bdb_options_.compression); + s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration, &index_entry); if (expiration == kNoExpiration) { @@ -756,66 +793,118 @@ uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value, return has_expiration ? expiration : kNoExpiration; } -std::shared_ptr BlobDBImpl::GetOldestBlobFile() { - std::vector> blob_files; - CopyBlobFiles(&blob_files, [](const std::shared_ptr& f) { - return !f->Obsolete() && f->Immutable(); - }); - if (blob_files.empty()) { - return nullptr; +void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) { + ReadLock l(&mutex_); + + context->next_file_number = next_file_number_.load(); + context->current_blob_files.clear(); + for (auto& p : blob_files_) { + context->current_blob_files.insert(p.first); } - blobf_compare_ttl compare; - return *std::min_element(blob_files.begin(), blob_files.end(), compare); + context->fifo_eviction_seq = fifo_eviction_seq_; + context->evict_expiration_up_to = evict_expiration_up_to_; } -bool BlobDBImpl::EvictOldestBlobFile() { - auto oldest_file = GetOldestBlobFile(); - if (oldest_file == nullptr) { - return false; +void BlobDBImpl::UpdateLiveSSTSize() { + uint64_t live_sst_size = 0; + bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size); + if (ok) { + live_sst_size_.store(live_sst_size); + ROCKS_LOG_INFO(db_options_.info_log, + "Updated total SST file size: %" PRIu64 " bytes.", + live_sst_size); + } else { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Failed to update total SST file size after flush or compaction."); + } + { + // Trigger FIFO eviction if needed. + MutexLock l(&write_mutex_); + Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/); + if (s.IsNoSpace()) { + ROCKS_LOG_WARN(db_options_.info_log, + "DB grow out-of-space after SST size updated. Current live" + " SST size: %" PRIu64 + " , current blob files size: %" PRIu64 ".", + live_sst_size_.load(), total_blob_size_.load()); + } + } +} + +Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size, + bool force_evict) { + write_mutex_.AssertHeld(); + + uint64_t live_sst_size = live_sst_size_.load(); + if (bdb_options_.max_db_size == 0 || + live_sst_size + total_blob_size_.load() + blob_size <= + bdb_options_.max_db_size) { + return Status::OK(); } - WriteLock wl(&mutex_); - // Double check the file is not obsolete by others - if (oldest_file_evicted_ == false && !oldest_file->Obsolete()) { - auto expiration_range = oldest_file->GetExpirationRange(); + if (bdb_options_.is_fifo == false || + (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) { + // FIFO eviction is disabled, or no space to insert new blob even we evict + // all blob files. + return Status::NoSpace( + "Write failed, as writing it would exceed max_db_size limit."); + } + + std::vector> candidate_files; + CopyBlobFiles(&candidate_files, + [&](const std::shared_ptr& blob_file) { + // Only evict TTL files + return blob_file->HasTTL(); + }); + std::sort(candidate_files.begin(), candidate_files.end(), + blobf_compare_ttl()); + std::reverse(candidate_files.begin(), candidate_files.end()); + fifo_eviction_seq_ = GetLatestSequenceNumber(); + + WriteLock l(&mutex_); + + while (!candidate_files.empty() && + live_sst_size + total_blob_size_.load() + blob_size > + bdb_options_.max_db_size) { + std::shared_ptr blob_file = candidate_files.back(); + candidate_files.pop_back(); + WriteLock file_lock(&blob_file->mutex_); + if (blob_file->Obsolete()) { + // File already obsoleted by someone else. + continue; + } + // FIFO eviction can evict open blob files. + if (!blob_file->Immutable()) { + Status s = CloseBlobFile(blob_file, false /*need_lock*/); + if (!s.ok()) { + return s; + } + } + assert(blob_file->Immutable()); + auto expiration_range = blob_file->GetExpirationRange(); ROCKS_LOG_INFO(db_options_.info_log, "Evict oldest blob file since DB out of space. Current " - "space used: %" PRIu64 ", blob dir size: %" PRIu64 - ", evicted blob file #%" PRIu64 + "live SST file size: %" PRIu64 ", total blob size: %" PRIu64 + ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64 " with expiration range (%" PRIu64 ", %" PRIu64 ").", - total_blob_space_.load(), bdb_options_.blob_dir_size, - oldest_file->BlobFileNumber(), expiration_range.first, - expiration_range.second); - oldest_file->MarkObsolete(GetLatestSequenceNumber()); - obsolete_files_.push_back(oldest_file); - oldest_file_evicted_.store(true); + live_sst_size, total_blob_size_.load(), + bdb_options_.max_db_size, blob_file->BlobFileNumber(), + expiration_range.first, expiration_range.second); + ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/); + evict_expiration_up_to_ = expiration_range.first; RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED); RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED, - oldest_file->BlobCount()); + blob_file->BlobCount()); RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED, - oldest_file->GetFileSize()); + blob_file->GetFileSize()); TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted"); - return true; } - - return false; -} - -Status BlobDBImpl::CheckSize(size_t blob_size) { - uint64_t new_space_util = total_blob_space_.load() + blob_size; - if (bdb_options_.blob_dir_size > 0) { - if (!bdb_options_.is_fifo && - (new_space_util > bdb_options_.blob_dir_size)) { - return Status::NoSpace( - "Write failed, as writing it would exceed blob_dir_size limit."); - } - if (bdb_options_.is_fifo && !oldest_file_evicted_.load() && - (new_space_util > - kEvictOldestFileAtSize * bdb_options_.blob_dir_size)) { - EvictOldestBlobFile(); - } + if (live_sst_size + total_blob_size_.load() + blob_size > + bdb_options_.max_db_size) { + return Status::NoSpace( + "Write failed, as writing it would exceed max_db_size limit."); } - return Status::OK(); } @@ -823,18 +912,15 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, const std::string& headerbuf, const Slice& key, const Slice& value, uint64_t expiration, std::string* index_entry) { - auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size(); - Status s = CheckSize(size_put); - if (!s.ok()) { - return s; - } - + Status s; uint64_t blob_offset = 0; uint64_t key_offset = 0; { WriteLock lockbfile_w(&bfile->mutex_); std::shared_ptr writer = CheckOrCreateWriterLocked(bfile); - if (!writer) return Status::IOError("Failed to create blob writer"); + if (!writer) { + return Status::IOError("Failed to create blob writer"); + } // write the blob to the blob log. s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset, @@ -851,8 +937,9 @@ Status BlobDBImpl::AppendBlob(const std::shared_ptr& bfile, // increment blob count bfile->blob_count_++; + uint64_t size_put = headerbuf.size() + key.size() + value.size(); bfile->file_size_ += size_put; - total_blob_space_ += size_put; + total_blob_size_ += size_put; if (expiration == kNoExpiration) { BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset, @@ -1114,14 +1201,19 @@ std::pair BlobDBImpl::SanityCheck(bool aborted) { return std::make_pair(true, -1); } -Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { +Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile, + bool need_lock) { assert(bfile != nullptr); + write_mutex_.AssertHeld(); Status s; ROCKS_LOG_INFO(db_options_.info_log, "Closing blob file %" PRIu64 ". Path: %s", bfile->BlobFileNumber(), bfile->PathName().c_str()); { - WriteLock wl(&mutex_); + std::unique_ptr lock; + if (need_lock) { + lock.reset(new WriteLock(&mutex_)); + } if (bfile->HasTTL()) { size_t erased __attribute__((__unused__)); @@ -1134,11 +1226,16 @@ Status BlobDBImpl::CloseBlobFile(std::shared_ptr bfile) { } if (!bfile->closed_.load()) { - WriteLock lockbfile_w(&bfile->mutex_); + std::unique_ptr file_lock; + if (need_lock) { + file_lock.reset(new WriteLock(&bfile->mutex_)); + } s = bfile->WriteFooterAndCloseLocked(); } - if (!s.ok()) { + if (s.ok()) { + total_blob_size_ += BlobLogFooter::kSize; + } else { ROCKS_LOG_ERROR(db_options_.info_log, "Failed to close blob file %" PRIu64 "with error: %s", bfile->BlobFileNumber(), s.ToString().c_str()); @@ -1155,6 +1252,18 @@ Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr& bfile) { return CloseBlobFile(bfile); } +void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr blob_file, + SequenceNumber obsolete_seq, + bool update_size) { + // Should hold write lock of mutex_ or during DB open. + blob_file->MarkObsolete(obsolete_seq); + obsolete_files_.push_back(blob_file); + assert(total_blob_size_.load() >= blob_file->GetFileSize()); + if (update_size) { + total_blob_size_ -= blob_file->GetFileSize(); + } +} + bool BlobDBImpl::VisibleToActiveSnapshot( const std::shared_ptr& bfile) { assert(bfile->Obsolete()); @@ -1198,6 +1307,7 @@ std::pair BlobDBImpl::CheckSeqFiles(bool aborted) { } } + MutexLock l(&write_mutex_); for (auto bfile : process_files) { CloseBlobFile(bfile); } @@ -1515,12 +1625,9 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, } } // end of ReadRecord loop - if (s.ok()) { - bfptr->MarkObsolete(GetLatestSequenceNumber()); - { - WriteLock wl(&mutex_); - obsolete_files_.push_back(bfptr); - } + { + WriteLock wl(&mutex_); + ObsoleteBlobFile(bfptr, GetLatestSequenceNumber(), true /*update_size*/); } ROCKS_LOG_INFO( @@ -1543,7 +1650,7 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr& bfptr, gc_stats->bytes_overwritten); RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired); if (newfile != nullptr) { - total_blob_space_ += newfile->file_size_; + total_blob_size_ += newfile->file_size_; ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".", newfile->BlobFileNumber()); RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES); @@ -1600,7 +1707,6 @@ std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { } file_deleted = true; - total_blob_space_ -= bfile->file_size_; ROCKS_LOG_INFO(db_options_.info_log, "File deleted as obsolete from blob dir %s", bfile->PathName().c_str()); @@ -1611,9 +1717,6 @@ std::pair BlobDBImpl::DeleteObsoleteFiles(bool aborted) { // directory change. Fsync if (file_deleted) { dir_ent_->Fsync(); - - // reset oldest_file_evicted flag - oldest_file_evicted_.store(false); } // put files back into obsolete if for some reason, delete failed @@ -1734,15 +1837,24 @@ void BlobDBImpl::TEST_DeleteObsoleteFiles() { } Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr& bfile) { + MutexLock l(&write_mutex_); return CloseBlobFile(bfile); } +void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr& blob_file, + SequenceNumber obsolete_seq, + bool update_size) { + return ObsoleteBlobFile(blob_file, obsolete_seq, update_size); +} + Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, GCStats* gc_stats) { return GCFileAndUpdateLSM(bfile, gc_stats); } void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } + +uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); } #endif // !NDEBUG } // namespace blob_db diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 38d1580bb3..d3e810deb0 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -43,19 +43,9 @@ struct FlushJobInfo; namespace blob_db { -class BlobFile; +struct BlobCompactionContext; class BlobDBImpl; - -class BlobDBFlushBeginListener : public EventListener { - public: - explicit BlobDBFlushBeginListener(BlobDBImpl* blob_db_impl) - : blob_db_impl_(blob_db_impl) {} - - void OnFlushBegin(DB* db, const FlushJobInfo& info) override; - - private: - BlobDBImpl* blob_db_impl_; -}; +class BlobFile; // this implements the callback from the WAL which ensures that the // blob record is present in the blob log. If fsync/fdatasync in not @@ -154,6 +144,8 @@ class BlobDBImpl : public BlobDB { virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; + virtual Status Close() override; + virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size, bool flush_memtable = true) override; @@ -180,6 +172,10 @@ class BlobDBImpl : public BlobDB { Status SyncBlobFiles() override; + void UpdateLiveSSTSize(); + + void GetCompactionContext(BlobCompactionContext* context); + #ifndef NDEBUG Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value); @@ -190,12 +186,18 @@ class BlobDBImpl : public BlobDB { Status TEST_CloseBlobFile(std::shared_ptr& bfile); + void TEST_ObsoleteBlobFile(std::shared_ptr& blob_file, + SequenceNumber obsolete_seq = 0, + bool update_size = true); + Status TEST_GCFileAndUpdateLSM(std::shared_ptr& bfile, GCStats* gc_stats); void TEST_RunGC(); void TEST_DeleteObsoleteFiles(); + + uint64_t TEST_live_sst_size(); #endif // !NDEBUG private: @@ -217,11 +219,17 @@ class BlobDBImpl : public BlobDB { std::string* compression_output) const; // Close a file by appending a footer, and removes file from open files list. - Status CloseBlobFile(std::shared_ptr bfile); + Status CloseBlobFile(std::shared_ptr bfile, bool need_lock = true); // Close a file if its size exceeds blob_file_size Status CloseBlobFileIfNeeded(std::shared_ptr& bfile); + // Mark file as obsolete and move the file to obsolete file list. + // + // REQUIRED: hold write lock of mutex_ or during DB open. + void ObsoleteBlobFile(std::shared_ptr blob_file, + SequenceNumber obsolete_seq, bool update_size); + uint64_t ExtractExpiration(const Slice& key, const Slice& value, Slice* value_slice, std::string* new_value); @@ -243,8 +251,6 @@ class BlobDBImpl : public BlobDB { std::shared_ptr FindBlobFileLocked(uint64_t expiration) const; - void Shutdown(); - // periodic sanity check. Bunch of checks std::pair SanityCheck(bool aborted); @@ -315,11 +321,12 @@ class BlobDBImpl : public BlobDB { uint64_t EpochNow() { return env_->NowMicros() / 1000000; } - Status CheckSize(size_t blob_size); - - std::shared_ptr GetOldestBlobFile(); - - bool EvictOldestBlobFile(); + // Check if inserting a new blob will make DB grow out of space. + // If is_fifo = true, FIFO eviction will be triggered to make room for the + // new blob. If force_evict = true, FIFO eviction will evict blob files + // even eviction will not make enough room for the new blob. + Status CheckSizeAndEvictBlobFiles(uint64_t blob_size, + bool force_evict = false); // name of the database directory std::string dbname_; @@ -366,10 +373,10 @@ class BlobDBImpl : public BlobDB { // all the blob files which are currently being appended to based // on variety of incoming TTL's - std::multiset, blobf_compare_ttl> open_ttl_files_; + std::set, blobf_compare_ttl> open_ttl_files_; - // atomic bool to represent shutdown - std::atomic shutdown_; + // Flag to check whether Close() has been called on this DB + bool closed_; // timer based queue to execute tasks TimerQueue tqueue_; @@ -378,14 +385,25 @@ class BlobDBImpl : public BlobDB { // counter is used to monitor and close excess RA files. std::atomic open_file_count_; - // total size of all blob files at a given time - std::atomic total_blob_space_; + // Total size of all live blob files (i.e. exclude obsolete files). + std::atomic total_blob_size_; + + // total size of SST files. + std::atomic live_sst_size_; + + // Latest FIFO eviction timestamp + // + // REQUIRES: access with metex_ lock held. + uint64_t fifo_eviction_seq_; + + // The expiration up to which latest FIFO eviction evicts. + // + // REQUIRES: access with metex_ lock held. + uint64_t evict_expiration_up_to_; + std::list> obsolete_files_; - bool open_p1_done_; uint32_t debug_level_; - - std::atomic oldest_file_evicted_; }; } // namespace blob_db diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h index f901df366f..38bbfac624 100644 --- a/utilities/blob_db/blob_db_iterator.h +++ b/utilities/blob_db/blob_db_iterator.h @@ -46,28 +46,36 @@ class BlobDBIterator : public Iterator { StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->SeekToFirst(); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Next(); + } } void SeekToLast() override { StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->SeekToLast(); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Prev(); + } } void Seek(const Slice& target) override { StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->Seek(target); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Next(); + } } void SeekForPrev(const Slice& target) override { StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); RecordTick(statistics_, BLOB_DB_NUM_SEEK); iter_->SeekForPrev(target); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Prev(); + } } void Next() override { @@ -75,7 +83,9 @@ class BlobDBIterator : public Iterator { StopWatch next_sw(env_, statistics_, BLOB_DB_NEXT_MICROS); RecordTick(statistics_, BLOB_DB_NUM_NEXT); iter_->Next(); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Next(); + } } void Prev() override { @@ -83,7 +93,9 @@ class BlobDBIterator : public Iterator { StopWatch prev_sw(env_, statistics_, BLOB_DB_PREV_MICROS); RecordTick(statistics_, BLOB_DB_NUM_PREV); iter_->Prev(); - UpdateBlobValue(); + while (UpdateBlobValue()) { + iter_->Prev(); + } } Slice key() const override { @@ -102,12 +114,24 @@ class BlobDBIterator : public Iterator { // Iterator::Refresh() not supported. private: - void UpdateBlobValue() { + // Return true if caller should continue to next value. + bool UpdateBlobValue() { TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1"); TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2"); value_.Reset(); - if (iter_->Valid() && iter_->IsBlob()) { - status_ = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); + if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) { + Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); + if (s.IsNotFound()) { + return true; + } else { + if (!s.ok()) { + status_ = s; + } + return false; + } + return status_.IsNotFound(); + } else { + return false; } } diff --git a/utilities/blob_db/blob_db_listener.h b/utilities/blob_db/blob_db_listener.h new file mode 100644 index 0000000000..f096d238ba --- /dev/null +++ b/utilities/blob_db/blob_db_listener.h @@ -0,0 +1,46 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include + +#include "rocksdb/listener.h" +#include "util/mutexlock.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace rocksdb { +namespace blob_db { + +class BlobDBListener : public EventListener { + public: + explicit BlobDBListener(BlobDBImpl* blob_db_impl) + : blob_db_impl_(blob_db_impl) {} + + void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->SyncBlobFiles(); + } + + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->UpdateLiveSSTSize(); + } + + void OnCompactionCompleted(DB* /*db*/, + const CompactionJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->UpdateLiveSSTSize(); + } + + private: + BlobDBImpl* blob_db_impl_; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 2fa8fe12ae..32e5effbbf 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -45,7 +45,10 @@ class BlobDBTest : public testing::Test { assert(s.ok()); } - ~BlobDBTest() { Destroy(); } + ~BlobDBTest() { + SyncPoint::GetInstance()->ClearAllCallBacks(); + Destroy(); + } Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(), Options options = Options()) { @@ -80,8 +83,13 @@ class BlobDBTest : public testing::Test { return reinterpret_cast(blob_db_); } - Status Put(const Slice &key, const Slice &value) { - return blob_db_->Put(WriteOptions(), key, value); + Status Put(const Slice &key, const Slice &value, + std::map *data = nullptr) { + Status s = blob_db_->Put(WriteOptions(), key, value); + if (data != nullptr) { + (*data)[key.ToString()] = value.ToString(); + } + return s; } void Delete(const std::string &key, @@ -92,6 +100,15 @@ class BlobDBTest : public testing::Test { } } + Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl, + std::map *data = nullptr) { + Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl); + if (data != nullptr) { + (*data)[key.ToString()] = value.ToString(); + } + return s; + } + Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) { return blob_db_->PutUntil(WriteOptions(), key, value, expiration); } @@ -747,7 +764,7 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { } // This test is no longer valid since we now return an error when we go -// over the configured blob_dir_size. +// over the configured max_db_size. // The test needs to be re-written later in such a way that writes continue // after a GC happens. TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { @@ -755,7 +772,7 @@ TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { Options options; options.env = mock_env_.get(); BlobDBOptions bdb_options; - bdb_options.blob_dir_size = 100; + bdb_options.max_db_size = 100; bdb_options.blob_file_size = 100; bdb_options.min_blob_size = 0; bdb_options.disable_background_tasks = true; @@ -1038,13 +1055,14 @@ TEST_F(BlobDBTest, MigrateFromPlainRocksDB) { } // Test to verify that a NoSpace IOError Status is returned on reaching -// blob_dir_size limit. +// max_db_size limit. TEST_F(BlobDBTest, OutOfSpace) { // Use mock env to stop wall clock. Options options; options.env = mock_env_.get(); BlobDBOptions bdb_options; - bdb_options.blob_dir_size = 150; + bdb_options.max_db_size = 200; + bdb_options.is_fifo = false; bdb_options.disable_background_tasks = true; Open(bdb_options); @@ -1053,16 +1071,16 @@ TEST_F(BlobDBTest, OutOfSpace) { std::string value(100, 'v'); ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60)); - // Putting another blob should fail as ading it would exceed the blob_dir_size + // Putting another blob should fail as ading it would exceed the max_db_size // limit. Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60); ASSERT_TRUE(s.IsIOError()); ASSERT_TRUE(s.IsNoSpace()); } -TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) { +TEST_F(BlobDBTest, FIFOEviction) { BlobDBOptions bdb_options; - bdb_options.blob_dir_size = 270; + bdb_options.max_db_size = 200; bdb_options.blob_file_size = 100; bdb_options.is_fifo = true; bdb_options.disable_background_tasks = true; @@ -1078,32 +1096,36 @@ TEST_F(BlobDBTest, EvictOldestFileWhenCloseToSpaceLimit) { // So a 100 byte blob should take up 132 bytes. std::string value(100, 'v'); ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10)); + VerifyDB({{"key1", value}}); - auto *bdb_impl = static_cast(blob_db_); - auto blob_files = bdb_impl->TEST_GetBlobFiles(); - ASSERT_EQ(1, blob_files.size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); // Adding another 100 byte blob would take the total size to 264 bytes - // (2*132), which is more than 90% of blob_dir_size. So, the oldest file - // should be evicted and put in obsolete files list. + // (2*132). max_db_size will be exceeded + // than max_db_size and trigger FIFO eviction. ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60)); - - auto obsolete_files = bdb_impl->TEST_GetObsoleteFiles(); - ASSERT_EQ(1, obsolete_files.size()); - ASSERT_TRUE(obsolete_files[0]->Immutable()); - ASSERT_EQ(blob_files[0]->BlobFileNumber(), - obsolete_files[0]->BlobFileNumber()); - - bdb_impl->TEST_DeleteObsoleteFiles(); - obsolete_files = bdb_impl->TEST_GetObsoleteFiles(); - ASSERT_TRUE(obsolete_files.empty()); ASSERT_EQ(1, evict_count); + // key1 will exist until corresponding file be deleted. + VerifyDB({{"key1", value}, {"key2", value}}); + + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_TRUE(blob_files[0]->Obsolete()); + ASSERT_FALSE(blob_files[1]->Obsolete()); + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(1, obsolete_files.size()); + ASSERT_EQ(blob_files[0], obsolete_files[0]); + + blob_db_impl()->TEST_DeleteObsoleteFiles(); + obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_TRUE(obsolete_files.empty()); + VerifyDB({{"key2", value}}); } -TEST_F(BlobDBTest, NoOldestFileToEvict) { +TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) { Options options; BlobDBOptions bdb_options; - bdb_options.blob_dir_size = 1000; + bdb_options.max_db_size = 1000; bdb_options.blob_file_size = 5000; bdb_options.is_fifo = true; bdb_options.disable_background_tasks = true; @@ -1116,11 +1138,97 @@ TEST_F(BlobDBTest, NoOldestFileToEvict) { SyncPoint::GetInstance()->EnableProcessing(); std::string value(2000, 'v'); - ASSERT_OK(Put("foo", std::string(2000, 'v'))); - ASSERT_OK(Put("bar", std::string(2000, 'v'))); + ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace()); ASSERT_EQ(0, evict_count); } +TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) { + BlobDBOptions bdb_options; + bdb_options.is_fifo = true; + bdb_options.min_blob_size = 100; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + options.env = mock_env_.get(); + options.disable_auto_compactions = true; + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + Open(bdb_options, options); + + ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size()); + std::string small_value(50, 'v'); + std::map data; + // Insert some data into LSM tree to make sure FIFO eviction take SST + // file size into account. + for (int i = 0; i < 1000; i++) { + ASSERT_OK(Put("key" + ToString(i), small_value, &data)); + } + ASSERT_OK(blob_db_->Flush(FlushOptions())); + uint64_t live_sst_size = 0; + ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize, + &live_sst_size)); + ASSERT_TRUE(live_sst_size > 0); + ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size()); + + bdb_options.max_db_size = live_sst_size + 2000; + Reopen(bdb_options, options); + ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size()); + + std::string value_1k(1000, 'v'); + ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data)); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB(data); + // large_key2 evicts large_key1 + ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + data.erase("large_key1"); + VerifyDB(data); + // large_key3 get no enough space even after evicting large_key2, so it + // instead return no space error. + std::string value_2k(2000, 'v'); + ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace()); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + // Verify large_key2 still exists. + VerifyDB(data); +} + +// Test flush or compaction will trigger FIFO eviction since they update +// total SST file size. +TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) { + BlobDBOptions bdb_options; + bdb_options.max_db_size = 1000; + bdb_options.is_fifo = true; + bdb_options.min_blob_size = 100; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + options.env = mock_env_.get(); + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + options.compression = kNoCompression; + Open(bdb_options, options); + + std::string value(800, 'v'); + ASSERT_OK(PutWithTTL("large_key", value, 60)); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB({{"large_key", value}}); + + // Insert some small keys and flush to bring DB out of space. + std::map data; + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put("key" + ToString(i), "v", &data)); + } + ASSERT_OK(blob_db_->Flush(FlushOptions())); + + // Verify large_key is deleted by FIFO eviction. + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB(data); +} + TEST_F(BlobDBTest, InlineSmallValues) { constexpr uint64_t kMaxExpiration = 1000; Random rnd(301); @@ -1197,6 +1305,7 @@ TEST_F(BlobDBTest, CompactionFilterNotSupported) { } } +// Test comapction filter should remove any expired blob index. TEST_F(BlobDBTest, FilterExpiredBlobIndex) { constexpr size_t kNumKeys = 100; constexpr size_t kNumPuts = 1000; @@ -1262,6 +1371,147 @@ TEST_F(BlobDBTest, FilterExpiredBlobIndex) { VerifyDB(data_after_compact); } +// Test compaction filter should remove any blob index where corresponding +// blob file has been removed (either by FIFO or garbage collection). +TEST_F(BlobDBTest, FilterFileNotAvailable) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + options.disable_auto_compactions = true; + Open(bdb_options, options); + + ASSERT_OK(Put("foo", "v1")); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_EQ(1, blob_files[0]->BlobFileNumber()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + + ASSERT_OK(Put("bar", "v2")); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_EQ(2, blob_files[1]->BlobFileNumber()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1])); + + DB *base_db = blob_db_->GetRootDB(); + std::vector versions; + ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions)); + ASSERT_EQ(2, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + ASSERT_EQ("foo", versions[1].user_key); + VerifyDB({{"bar", "v2"}, {"foo", "v1"}}); + + ASSERT_OK(blob_db_->Flush(FlushOptions())); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions)); + ASSERT_EQ(2, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + ASSERT_EQ("foo", versions[1].user_key); + VerifyDB({{"bar", "v2"}, {"foo", "v1"}}); + + // Remove the first blob file and compact. foo should be remove from base db. + blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions)); + ASSERT_EQ(1, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + VerifyDB({{"bar", "v2"}}); + + // Remove the second blob file and compact. bar should be remove from base db. + blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", &versions)); + ASSERT_EQ(0, versions.size()); + VerifyDB({}); +} + +// Test compaction filter should filter any inlined TTL keys that would have +// been dropped by last FIFO eviction if they are store out-of-line. +TEST_F(BlobDBTest, FilterForFIFOEviction) { + Random rnd(215); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 100; + bdb_options.ttl_range_secs = 60; + bdb_options.max_db_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + mock_env_->set_current_time(0); + options.env = mock_env_.get(); + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + options.disable_auto_compactions = true; + Open(bdb_options, options); + + std::map data; + std::map data_after_compact; + // Insert some small values that will be inlined. + for (int i = 0; i < 1000; i++) { + std::string key = "key" + ToString(i); + std::string value = test::RandomHumanReadableString(&rnd, 50); + uint64_t ttl = rnd.Next() % 120 + 1; + ASSERT_OK(PutWithTTL(key, value, ttl, &data)); + if (ttl >= 60) { + data_after_compact[key] = value; + } + } + uint64_t num_keys_to_evict = data.size() - data_after_compact.size(); + ASSERT_OK(blob_db_->Flush(FlushOptions())); + uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size(); + ASSERT_GT(live_sst_size, 0); + VerifyDB(data); + + bdb_options.max_db_size = live_sst_size + 30000; + bdb_options.is_fifo = true; + Reopen(bdb_options, options); + VerifyDB(data); + + // Put two large values, each on a different blob file. + std::string large_value(10000, 'v'); + ASSERT_OK(PutWithTTL("large_key1", large_value, 90)); + ASSERT_OK(PutWithTTL("large_key2", large_value, 150)); + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + data["large_key1"] = large_value; + data["large_key2"] = large_value; + VerifyDB(data); + + // Put a third large value which will bring the DB out of space. + // FIFO eviction will evict the file of large_key1. + ASSERT_OK(PutWithTTL("large_key3", large_value, 150)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + data.erase("large_key1"); + data["large_key3"] = large_value; + VerifyDB(data); + + // Putting some more small values. These values shouldn't be evicted by + // compaction filter since they are inserted after FIFO eviction. + ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact)); + ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact)); + + // FIFO eviction doesn't trigger again since there enough room for the flush. + ASSERT_OK(blob_db_->Flush(FlushOptions())); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + + // Manual compact and check if compaction filter evict those keys with + // expiration < 60. + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // All keys with expiration < 60, plus large_key1 is filtered by + // compaction filter. + ASSERT_EQ(num_keys_to_evict + 1, + statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + data_after_compact["large_key2"] = large_value; + data_after_compact["large_key3"] = large_value; + VerifyDB(data_after_compact); +} + } // namespace blob_db } // namespace rocksdb