mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-25 22:44:05 +00:00
Add statistics for BlobDB GC (#6296)
Summary: The patch adds statistics support to the new BlobDB garbage collection implementation; namely, it adds support for the following (pre-existing) tickers: `BLOB_DB_GC_NUM_FILES`: the number of blob files obsoleted by the GC logic. `BLOB_DB_GC_NUM_NEW_FILES`: the number of new blob files generated by the GC logic. `BLOB_DB_GC_FAILURES`: the number of failed GC passes (where a GC pass is equivalent to a (sub)compaction). `BLOB_DB_GC_NUM_KEYS_RELOCATED`: the number of blobs relocated to new blob files by the GC logic. `BLOB_DB_GC_BYTES_RELOCATED`: the total size of blobs relocated to new blob files. The tickers `BLOB_DB_GC_NUM_KEYS_OVERWRITTEN`, `BLOB_DB_GC_NUM_KEYS_EXPIRED`, `BLOB_DB_GC_BYTES_OVERWRITTEN`, `BLOB_DB_GC_BYTES_EXPIRED`, and `BLOB_DB_GC_MICROS` are not relevant for the new GC logic, and are thus marked deprecated. The patch also adds a couple of log messages that log the number and total size of blobs encountered and relocated during a GC pass, as well as the number of blob files created and obsoleted. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6296 Test Plan: Extended unit tests and used the BlobDB mode of `db_bench`. Differential Revision: D19402513 Pulled By: ltamasi fbshipit-source-id: d53d2bfbf4928a1db1e9346c67ebb9007b8932ec
This commit is contained in:
parent
71874c5aaf
commit
9e3ace42a4
|
@ -4,6 +4,9 @@
|
|||
* Fix incorrect results while block-based table uses kHashSearch, together with Prev()/SeekForPrev().
|
||||
* Fix a bug that prevents opening a DB after two consecutive crash with TransactionDB, where the first crash recovers from a corrupted WAL with kPointInTimeRecovery but the second cannot.
|
||||
|
||||
### Public API Change
|
||||
* The BlobDB garbage collector now emits the statistics `BLOB_DB_GC_NUM_FILES` (number of blob files obsoleted during GC), `BLOB_DB_GC_NUM_NEW_FILES` (number of new blob files generated during GC), `BLOB_DB_GC_FAILURES` (number of failed GC passes), `BLOB_DB_GC_NUM_KEYS_RELOCATED` (number of blobs relocated during GC), and `BLOB_DB_GC_BYTES_RELOCATED` (total size of blobs relocated during GC). On the other hand, the following statistics, which are not relevant for the new GC implementation, are now deprecated: `BLOB_DB_GC_NUM_KEYS_OVERWRITTEN`, `BLOB_DB_GC_NUM_KEYS_EXPIRED`, `BLOB_DB_GC_BYTES_OVERWRITTEN`, `BLOB_DB_GC_BYTES_EXPIRED`, and `BLOB_DB_GC_MICROS`.
|
||||
|
||||
## 6.7.0 (01/21/2020)
|
||||
### Public API Change
|
||||
* Added a rocksdb::FileSystem class in include/rocksdb/file_system.h to encapsulate file creation/read/write operations, and an option DBOptions::file_system to allow a user to pass in an instance of rocksdb::FileSystem. If its a non-null value, this will take precendence over DBOptions::env for file operations. A new API rocksdb::FileSystem::Default() returns a platform default object. The DBOptions::env option and Env::Default() API will continue to be used for threading and other OS related functions, and where DBOptions::file_system is not specified, for file operations. For storage developers who are accustomed to rocksdb::Env, the interface in rocksdb::FileSystem is new and will probably undergo some changes as more storage systems are ported to it from rocksdb::Env. As of now, no env other than Posix has been ported to the new interface.
|
||||
|
|
|
@ -287,23 +287,25 @@ enum Tickers : uint32_t {
|
|||
// size of blob index evicted from base DB by BlobDB compaction filter
|
||||
// because of corresponding file deleted.
|
||||
BLOB_DB_BLOB_INDEX_EVICTED_SIZE,
|
||||
// # of blob files being garbage collected.
|
||||
// # of blob files that were obsoleted by garbage collection.
|
||||
BLOB_DB_GC_NUM_FILES,
|
||||
// # of blob files generated by garbage collection.
|
||||
BLOB_DB_GC_NUM_NEW_FILES,
|
||||
// # of BlobDB garbage collection failures.
|
||||
BLOB_DB_GC_FAILURES,
|
||||
// # of keys drop by BlobDB garbage collection because they had been
|
||||
// overwritten.
|
||||
// # of keys dropped by BlobDB garbage collection because they had been
|
||||
// overwritten. DEPRECATED.
|
||||
BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
|
||||
// # of keys drop by BlobDB garbage collection because of expiration.
|
||||
// # of keys dropped by BlobDB garbage collection because of expiration.
|
||||
// DEPRECATED.
|
||||
BLOB_DB_GC_NUM_KEYS_EXPIRED,
|
||||
// # of keys relocated to new blob file by garbage collection.
|
||||
BLOB_DB_GC_NUM_KEYS_RELOCATED,
|
||||
// # of bytes drop by BlobDB garbage collection because they had been
|
||||
// overwritten.
|
||||
// # of bytes dropped by BlobDB garbage collection because they had been
|
||||
// overwritten. DEPRECATED.
|
||||
BLOB_DB_GC_BYTES_OVERWRITTEN,
|
||||
// # of bytes drop by BlobDB garbage collection because of expiration.
|
||||
// # of bytes dropped by BlobDB garbage collection because of expiration.
|
||||
// DEPRECATED.
|
||||
BLOB_DB_GC_BYTES_EXPIRED,
|
||||
// # of bytes relocated to new blob file by garbage collection.
|
||||
BLOB_DB_GC_BYTES_RELOCATED,
|
||||
|
@ -420,7 +422,7 @@ enum Histograms : uint32_t {
|
|||
BLOB_DB_BLOB_FILE_READ_MICROS,
|
||||
// Blob file sync latency.
|
||||
BLOB_DB_BLOB_FILE_SYNC_MICROS,
|
||||
// BlobDB garbage collection time.
|
||||
// BlobDB garbage collection time. DEPRECATED.
|
||||
BLOB_DB_GC_MICROS,
|
||||
// BlobDB compression time.
|
||||
BLOB_DB_COMPRESSION_MICROS,
|
||||
|
|
|
@ -8,6 +8,8 @@
|
|||
#include "utilities/blob_db/blob_compaction_filter.h"
|
||||
#include "db/dbformat.h"
|
||||
|
||||
#include <cinttypes>
|
||||
|
||||
namespace rocksdb {
|
||||
namespace blob_db {
|
||||
|
||||
|
@ -54,6 +56,30 @@ CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2(
|
|||
return Decision::kKeep;
|
||||
}
|
||||
|
||||
BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() {
|
||||
if (blob_file_) {
|
||||
CloseAndRegisterNewBlobFile();
|
||||
}
|
||||
|
||||
assert(context_gc_.blob_db_impl);
|
||||
|
||||
ROCKS_LOG_INFO(context_gc_.blob_db_impl->db_options_.info_log,
|
||||
"GC pass finished %s: encountered %" PRIu64 " blobs (%" PRIu64
|
||||
" bytes), relocated %" PRIu64 " blobs (%" PRIu64
|
||||
" bytes), created %" PRIu64 " new blob file(s)",
|
||||
!gc_stats_.HasError() ? "successfully" : "with failure",
|
||||
gc_stats_.AllBlobs(), gc_stats_.AllBytes(),
|
||||
gc_stats_.RelocatedBlobs(), gc_stats_.RelocatedBytes(),
|
||||
gc_stats_.NewFiles());
|
||||
|
||||
RecordTick(statistics(), BLOB_DB_GC_NUM_KEYS_RELOCATED,
|
||||
gc_stats_.RelocatedBlobs());
|
||||
RecordTick(statistics(), BLOB_DB_GC_BYTES_RELOCATED,
|
||||
gc_stats_.RelocatedBytes());
|
||||
RecordTick(statistics(), BLOB_DB_GC_NUM_NEW_FILES, gc_stats_.NewFiles());
|
||||
RecordTick(statistics(), BLOB_DB_GC_FAILURES, gc_stats_.HasError());
|
||||
}
|
||||
|
||||
CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
|
||||
const Slice& key, const Slice& existing_value,
|
||||
std::string* new_value) const {
|
||||
|
@ -68,13 +94,18 @@ CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
|
|||
BlobIndex blob_index;
|
||||
const Status s = blob_index.DecodeFrom(existing_value);
|
||||
if (!s.ok()) {
|
||||
gc_stats_.SetError();
|
||||
return BlobDecision::kCorruption;
|
||||
}
|
||||
|
||||
if (blob_index.IsInlined()) {
|
||||
gc_stats_.AddBlob(blob_index.value().size());
|
||||
|
||||
return BlobDecision::kKeep;
|
||||
}
|
||||
|
||||
gc_stats_.AddBlob(blob_index.size());
|
||||
|
||||
if (blob_index.HasTTL()) {
|
||||
return BlobDecision::kKeep;
|
||||
}
|
||||
|
@ -88,28 +119,34 @@ CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
|
|||
// is bounded though (determined by the number of compactions and the blob
|
||||
// file size option).
|
||||
if (!OpenNewBlobFileIfNeeded()) {
|
||||
gc_stats_.SetError();
|
||||
return BlobDecision::kIOError;
|
||||
}
|
||||
|
||||
PinnableSlice blob;
|
||||
CompressionType compression_type = kNoCompression;
|
||||
if (!ReadBlobFromOldFile(key, blob_index, &blob, &compression_type)) {
|
||||
gc_stats_.SetError();
|
||||
return BlobDecision::kIOError;
|
||||
}
|
||||
|
||||
uint64_t new_blob_file_number = 0;
|
||||
uint64_t new_blob_offset = 0;
|
||||
if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) {
|
||||
gc_stats_.SetError();
|
||||
return BlobDecision::kIOError;
|
||||
}
|
||||
|
||||
if (!CloseAndRegisterNewBlobFileIfNeeded()) {
|
||||
gc_stats_.SetError();
|
||||
return BlobDecision::kIOError;
|
||||
}
|
||||
|
||||
BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
|
||||
blob.size(), compression_type);
|
||||
|
||||
gc_stats_.AddRelocatedBlob(blob_index.size());
|
||||
|
||||
return BlobDecision::kChangeValue;
|
||||
}
|
||||
|
||||
|
@ -135,6 +172,8 @@ bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const {
|
|||
assert(blob_file_);
|
||||
assert(writer_);
|
||||
|
||||
gc_stats_.AddNewFile();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
#include "monitoring/statistics.h"
|
||||
#include "rocksdb/compaction_filter.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "utilities/blob_db/blob_db_gc_stats.h"
|
||||
#include "utilities/blob_db/blob_db_impl.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
@ -33,10 +34,10 @@ struct BlobCompactionContextGC {
|
|||
class BlobIndexCompactionFilterBase : public CompactionFilter {
|
||||
public:
|
||||
BlobIndexCompactionFilterBase(BlobCompactionContext&& context,
|
||||
uint64_t current_time, Statistics* statistics)
|
||||
uint64_t current_time, Statistics* stats)
|
||||
: context_(std::move(context)),
|
||||
current_time_(current_time),
|
||||
statistics_(statistics) {}
|
||||
statistics_(stats) {}
|
||||
|
||||
~BlobIndexCompactionFilterBase() override {
|
||||
RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
|
||||
|
@ -52,6 +53,9 @@ class BlobIndexCompactionFilterBase : public CompactionFilter {
|
|||
const Slice& value, std::string* /*new_value*/,
|
||||
std::string* /*skip_until*/) const override;
|
||||
|
||||
protected:
|
||||
Statistics* statistics() const { return statistics_; }
|
||||
|
||||
private:
|
||||
BlobCompactionContext context_;
|
||||
const uint64_t current_time_;
|
||||
|
@ -67,9 +71,9 @@ class BlobIndexCompactionFilterBase : public CompactionFilter {
|
|||
class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
|
||||
public:
|
||||
BlobIndexCompactionFilter(BlobCompactionContext&& context,
|
||||
uint64_t current_time, Statistics* statistics)
|
||||
: BlobIndexCompactionFilterBase(std::move(context), current_time,
|
||||
statistics) {}
|
||||
uint64_t current_time, Statistics* stats)
|
||||
: BlobIndexCompactionFilterBase(std::move(context), current_time, stats) {
|
||||
}
|
||||
|
||||
const char* Name() const override { return "BlobIndexCompactionFilter"; }
|
||||
};
|
||||
|
@ -78,16 +82,11 @@ class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
|
|||
public:
|
||||
BlobIndexCompactionFilterGC(BlobCompactionContext&& context,
|
||||
BlobCompactionContextGC&& context_gc,
|
||||
uint64_t current_time, Statistics* statistics)
|
||||
: BlobIndexCompactionFilterBase(std::move(context), current_time,
|
||||
statistics),
|
||||
uint64_t current_time, Statistics* stats)
|
||||
: BlobIndexCompactionFilterBase(std::move(context), current_time, stats),
|
||||
context_gc_(std::move(context_gc)) {}
|
||||
|
||||
~BlobIndexCompactionFilterGC() override {
|
||||
if (blob_file_) {
|
||||
CloseAndRegisterNewBlobFile();
|
||||
}
|
||||
}
|
||||
~BlobIndexCompactionFilterGC() override;
|
||||
|
||||
const char* Name() const override { return "BlobIndexCompactionFilterGC"; }
|
||||
|
||||
|
@ -109,6 +108,7 @@ class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
|
|||
BlobCompactionContextGC context_gc_;
|
||||
mutable std::shared_ptr<BlobFile> blob_file_;
|
||||
mutable std::shared_ptr<Writer> writer_;
|
||||
mutable BlobDBGarbageCollectionStats gc_stats_;
|
||||
};
|
||||
|
||||
// Compaction filter factory; similarly to the filters above, it comes
|
||||
|
|
52
utilities/blob_db/blob_db_gc_stats.h
Normal file
52
utilities/blob_db/blob_db_gc_stats.h
Normal file
|
@ -0,0 +1,52 @@
|
|||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
#pragma once
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
namespace blob_db {
|
||||
|
||||
/**
|
||||
* Statistics related to a single garbage collection pass (i.e. a single
|
||||
* (sub)compaction).
|
||||
*/
|
||||
class BlobDBGarbageCollectionStats {
|
||||
public:
|
||||
uint64_t AllBlobs() const { return all_blobs_; }
|
||||
uint64_t AllBytes() const { return all_bytes_; }
|
||||
uint64_t RelocatedBlobs() const { return relocated_blobs_; }
|
||||
uint64_t RelocatedBytes() const { return relocated_bytes_; }
|
||||
uint64_t NewFiles() const { return new_files_; }
|
||||
bool HasError() const { return error_; }
|
||||
|
||||
void AddBlob(uint64_t size) {
|
||||
++all_blobs_;
|
||||
all_bytes_ += size;
|
||||
}
|
||||
|
||||
void AddRelocatedBlob(uint64_t size) {
|
||||
++relocated_blobs_;
|
||||
relocated_bytes_ += size;
|
||||
}
|
||||
|
||||
void AddNewFile() { ++new_files_; }
|
||||
|
||||
void SetError() { error_ = true; }
|
||||
|
||||
private:
|
||||
uint64_t all_blobs_ = 0;
|
||||
uint64_t all_bytes_ = 0;
|
||||
uint64_t relocated_blobs_ = 0;
|
||||
uint64_t relocated_bytes_ = 0;
|
||||
uint64_t new_files_ = 0;
|
||||
bool error_ = false;
|
||||
};
|
||||
|
||||
} // namespace blob_db
|
||||
} // namespace rocksdb
|
||||
#endif // ROCKSDB_LITE
|
|
@ -540,6 +540,8 @@ void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
|
|||
// Note: we need to stop as soon as we find a blob file that has any
|
||||
// linked SSTs (or one potentially referenced by memtables).
|
||||
|
||||
uint64_t obsoleted_files = 0;
|
||||
|
||||
auto it = live_imm_non_ttl_blob_files_.begin();
|
||||
while (it != live_imm_non_ttl_blob_files_.end()) {
|
||||
const auto& blob_file = it->second;
|
||||
|
@ -560,6 +562,15 @@ void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
|
|||
}
|
||||
|
||||
it = live_imm_non_ttl_blob_files_.erase(it);
|
||||
|
||||
++obsoleted_files;
|
||||
}
|
||||
|
||||
if (obsoleted_files > 0) {
|
||||
ROCKS_LOG_INFO(db_options_.info_log,
|
||||
"%" PRIu64 " blob file(s) marked obsolete by GC",
|
||||
obsoleted_files);
|
||||
RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1371,6 +1371,7 @@ TEST_F(BlobDBTest, GarbageCollection) {
|
|||
|
||||
Options options;
|
||||
options.env = mock_env_.get();
|
||||
options.statistics = CreateDBStatistics();
|
||||
|
||||
Open(bdb_options, options);
|
||||
|
||||
|
@ -1504,6 +1505,17 @@ TEST_F(BlobDBTest, GarbageCollection) {
|
|||
|
||||
VerifyBaseDBBlobIndex(blob_index_versions);
|
||||
|
||||
const Statistics *const statistics = options.statistics.get();
|
||||
assert(statistics);
|
||||
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), cutoff);
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), cutoff);
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 0);
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED),
|
||||
cutoff * kBlobsPerFile);
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED),
|
||||
cutoff * kBlobsPerFile * kLargeValueSize);
|
||||
|
||||
// At this point, we should have 128 immutable non-TTL files with file numbers
|
||||
// 33..128 and 130..161. (129 was taken by the TTL blob file.)
|
||||
{
|
||||
|
@ -1522,6 +1534,47 @@ TEST_F(BlobDBTest, GarbageCollection) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(BlobDBTest, GarbageCollectionFailure) {
|
||||
BlobDBOptions bdb_options;
|
||||
bdb_options.min_blob_size = 0;
|
||||
bdb_options.enable_garbage_collection = true;
|
||||
bdb_options.garbage_collection_cutoff = 1.0;
|
||||
bdb_options.disable_background_tasks = true;
|
||||
|
||||
Options db_options;
|
||||
db_options.statistics = CreateDBStatistics();
|
||||
|
||||
Open(bdb_options, db_options);
|
||||
|
||||
// Write a couple of valid blobs.
|
||||
Put("foo", "bar");
|
||||
Put("dead", "beef");
|
||||
|
||||
// Write a fake blob reference into the base DB that cannot be parsed.
|
||||
WriteBatch batch;
|
||||
ASSERT_OK(WriteBatchInternal::PutBlobIndex(
|
||||
&batch, blob_db_->DefaultColumnFamily()->GetID(), "key",
|
||||
"not a valid blob index"));
|
||||
ASSERT_OK(blob_db_->GetRootDB()->Write(WriteOptions(), &batch));
|
||||
|
||||
auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
|
||||
ASSERT_EQ(blob_files.size(), 1);
|
||||
auto blob_file = blob_files[0];
|
||||
ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
|
||||
|
||||
ASSERT_TRUE(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)
|
||||
.IsCorruption());
|
||||
|
||||
const Statistics *const statistics = db_options.statistics.get();
|
||||
assert(statistics);
|
||||
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_FILES), 0);
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES), 1);
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_FAILURES), 1);
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED), 2);
|
||||
ASSERT_EQ(statistics->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED), 7);
|
||||
}
|
||||
|
||||
// File should be evicted after expiration.
|
||||
TEST_F(BlobDBTest, EvictExpiredFile) {
|
||||
BlobDBOptions bdb_options;
|
||||
|
|
Loading…
Reference in a new issue