mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-27 11:43:49 +00:00
5d68243e61
Summary: Submitting on behalf of another employee. Closes https://github.com/facebook/rocksdb/pull/3557 Differential Revision: D7146025 Pulled By: ajkr fbshipit-source-id: 495ca5db5beec3789e671e26f78170957704e77e
1751 lines
58 KiB
C++
1751 lines
58 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include "utilities/blob_db/blob_db_impl.h"
|
|
#include <algorithm>
|
|
#include <cinttypes>
|
|
#include <iomanip>
|
|
#include <memory>
|
|
|
|
#include "db/db_impl.h"
|
|
#include "db/write_batch_internal.h"
|
|
#include "monitoring/instrumented_mutex.h"
|
|
#include "monitoring/statistics.h"
|
|
#include "rocksdb/convenience.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/iterator.h"
|
|
#include "rocksdb/utilities/stackable_db.h"
|
|
#include "rocksdb/utilities/transaction.h"
|
|
#include "table/block.h"
|
|
#include "table/block_based_table_builder.h"
|
|
#include "table/block_builder.h"
|
|
#include "table/meta_blocks.h"
|
|
#include "util/cast_util.h"
|
|
#include "util/crc32c.h"
|
|
#include "util/file_reader_writer.h"
|
|
#include "util/filename.h"
|
|
#include "util/logging.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/random.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/sync_point.h"
|
|
#include "util/timer_queue.h"
|
|
#include "utilities/blob_db/blob_compaction_filter.h"
|
|
#include "utilities/blob_db/blob_db_iterator.h"
|
|
#include "utilities/blob_db/blob_index.h"
|
|
|
|
namespace {
|
|
int kBlockBasedTableVersionFormat = 2;
|
|
} // end namespace
|
|
|
|
namespace rocksdb {
|
|
namespace blob_db {
|
|
|
|
void BlobDBFlushBeginListener::OnFlushBegin(DB* /*db*/,
|
|
const FlushJobInfo& /*info*/) {
|
|
assert(blob_db_impl_ != nullptr);
|
|
blob_db_impl_->SyncBlobFiles();
|
|
}
|
|
|
|
WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound(
|
|
unsigned long long /*log_number*/, const std::string& /*log_file_name*/,
|
|
const WriteBatch& /*batch*/, WriteBatch* /*new_batch*/,
|
|
bool* /*batch_changed*/) {
|
|
return WalFilter::WalProcessingOption::kContinueProcessing;
|
|
}
|
|
|
|
bool blobf_compare_ttl::operator()(const std::shared_ptr<BlobFile>& lhs,
|
|
const std::shared_ptr<BlobFile>& rhs) const {
|
|
if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
|
|
return true;
|
|
}
|
|
if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
|
|
return false;
|
|
}
|
|
return lhs->BlobFileNumber() < rhs->BlobFileNumber();
|
|
}
|
|
|
|
BlobDBImpl::BlobDBImpl(const std::string& dbname,
|
|
const BlobDBOptions& blob_db_options,
|
|
const DBOptions& db_options,
|
|
const ColumnFamilyOptions& cf_options)
|
|
: BlobDB(),
|
|
dbname_(dbname),
|
|
db_impl_(nullptr),
|
|
env_(db_options.env),
|
|
ttl_extractor_(blob_db_options.ttl_extractor.get()),
|
|
bdb_options_(blob_db_options),
|
|
db_options_(db_options),
|
|
cf_options_(cf_options),
|
|
env_options_(db_options),
|
|
statistics_(db_options_.statistics.get()),
|
|
next_file_number_(1),
|
|
epoch_of_(0),
|
|
shutdown_(false),
|
|
open_file_count_(0),
|
|
total_blob_space_(0),
|
|
open_p1_done_(false),
|
|
debug_level_(0),
|
|
oldest_file_evicted_(false) {
|
|
blob_dir_ = (bdb_options_.path_relative)
|
|
? dbname + "/" + bdb_options_.blob_dir
|
|
: bdb_options_.blob_dir;
|
|
env_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
|
|
}
|
|
|
|
BlobDBImpl::~BlobDBImpl() {
|
|
// CancelAllBackgroundWork(db_, true);
|
|
|
|
Shutdown();
|
|
}
|
|
|
|
BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
|
|
|
|
Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
|
|
assert(handles != nullptr);
|
|
assert(db_ == nullptr);
|
|
if (blob_dir_.empty()) {
|
|
return Status::NotSupported("No blob directory in options");
|
|
}
|
|
if (cf_options_.compaction_filter != nullptr ||
|
|
cf_options_.compaction_filter_factory != nullptr) {
|
|
return Status::NotSupported("Blob DB doesn't support compaction filter.");
|
|
}
|
|
|
|
Status s;
|
|
|
|
// Create info log.
|
|
if (db_options_.info_log == nullptr) {
|
|
s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
|
|
ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
|
|
|
|
// Open blob directory.
|
|
s = env_->CreateDirIfMissing(blob_dir_);
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to create blob_dir %s, status: %s",
|
|
blob_dir_.c_str(), s.ToString().c_str());
|
|
}
|
|
s = env_->NewDirectory(blob_dir_, &dir_ent_);
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
|
|
s.ToString().c_str());
|
|
return s;
|
|
}
|
|
|
|
// Open blob files.
|
|
s = OpenAllBlobFiles();
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
// Update options
|
|
db_options_.listeners.push_back(
|
|
std::shared_ptr<EventListener>(new BlobDBFlushBeginListener(this)));
|
|
cf_options_.compaction_filter_factory.reset(
|
|
new BlobIndexCompactionFilterFactory(env_, statistics_));
|
|
|
|
// Open base db.
|
|
ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
|
|
s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
|
|
|
|
// Start background jobs.
|
|
if (!bdb_options_.disable_background_tasks) {
|
|
StartBackgroundTasks();
|
|
}
|
|
|
|
ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
|
|
bdb_options_.Dump(db_options_.info_log.get());
|
|
return s;
|
|
}
|
|
|
|
void BlobDBImpl::StartBackgroundTasks() {
|
|
// store a call to a member function and object
|
|
tqueue_.add(
|
|
kReclaimOpenFilesPeriodMillisecs,
|
|
std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
|
|
tqueue_.add(static_cast<int64_t>(
|
|
bdb_options_.garbage_collection_interval_secs * 1000),
|
|
std::bind(&BlobDBImpl::RunGC, this, std::placeholders::_1));
|
|
tqueue_.add(
|
|
kDeleteObsoleteFilesPeriodMillisecs,
|
|
std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
|
|
tqueue_.add(kSanityCheckPeriodMillisecs,
|
|
std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
|
|
tqueue_.add(
|
|
kCheckSeqFilesPeriodMillisecs,
|
|
std::bind(&BlobDBImpl::CheckSeqFiles, this, std::placeholders::_1));
|
|
}
|
|
|
|
void BlobDBImpl::Shutdown() { shutdown_.store(true); }
|
|
|
|
Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
|
|
assert(file_numbers != nullptr);
|
|
std::vector<std::string> all_files;
|
|
Status s = env_->GetChildren(blob_dir_, &all_files);
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to get list of blob files, status: %s",
|
|
s.ToString().c_str());
|
|
return s;
|
|
}
|
|
|
|
for (const auto& file_name : all_files) {
|
|
uint64_t file_number;
|
|
FileType type;
|
|
bool success = ParseFileName(file_name, &file_number, &type);
|
|
if (success && type == kBlobFile) {
|
|
file_numbers->insert(file_number);
|
|
} else {
|
|
ROCKS_LOG_WARN(db_options_.info_log,
|
|
"Skipping file in blob directory: %s", file_name.c_str());
|
|
}
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status BlobDBImpl::OpenAllBlobFiles() {
|
|
std::set<uint64_t> file_numbers;
|
|
Status s = GetAllBlobFiles(&file_numbers);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
if (!file_numbers.empty()) {
|
|
next_file_number_.store(*file_numbers.rbegin() + 1);
|
|
}
|
|
|
|
std::string blob_file_list;
|
|
std::string obsolete_file_list;
|
|
|
|
for (auto& file_number : file_numbers) {
|
|
std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
|
|
this, blob_dir_, file_number, db_options_.info_log.get());
|
|
blob_file->MarkImmutable();
|
|
|
|
// Read file header and footer
|
|
Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
|
|
if (read_metadata_status.IsCorruption()) {
|
|
// Remove incomplete file.
|
|
blob_file->MarkObsolete(0 /*sequence number*/);
|
|
obsolete_files_.push_back(blob_file);
|
|
if (!obsolete_file_list.empty()) {
|
|
obsolete_file_list.append(", ");
|
|
}
|
|
obsolete_file_list.append(ToString(file_number));
|
|
continue;
|
|
} else if (!read_metadata_status.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Unable to read metadata of blob file % " PRIu64
|
|
", status: '%s'",
|
|
file_number, read_metadata_status.ToString().c_str());
|
|
return read_metadata_status;
|
|
}
|
|
|
|
blob_files_[file_number] = blob_file;
|
|
if (!blob_file_list.empty()) {
|
|
blob_file_list.append(", ");
|
|
}
|
|
blob_file_list.append(ToString(file_number));
|
|
}
|
|
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
|
|
blob_file_list.c_str());
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"Found %" ROCKSDB_PRIszt
|
|
" incomplete or corrupted blob files: %s",
|
|
obsolete_files_.size(), obsolete_file_list.c_str());
|
|
return s;
|
|
}
|
|
|
|
void BlobDBImpl::CloseRandomAccessLocked(
|
|
const std::shared_ptr<BlobFile>& bfile) {
|
|
bfile->CloseRandomAccessLocked();
|
|
open_file_count_--;
|
|
}
|
|
|
|
std::shared_ptr<RandomAccessFileReader> BlobDBImpl::GetOrOpenRandomAccessReader(
|
|
const std::shared_ptr<BlobFile>& bfile, Env* env,
|
|
const EnvOptions& env_options) {
|
|
bool fresh_open = false;
|
|
auto rar = bfile->GetOrOpenRandomAccessReader(env, env_options, &fresh_open);
|
|
if (fresh_open) open_file_count_++;
|
|
return rar;
|
|
}
|
|
|
|
std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
|
|
uint64_t file_num = next_file_number_++;
|
|
auto bfile = std::make_shared<BlobFile>(this, blob_dir_, file_num,
|
|
db_options_.info_log.get());
|
|
ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
|
|
bfile->PathName().c_str(), reason.c_str());
|
|
LogFlush(db_options_.info_log);
|
|
return bfile;
|
|
}
|
|
|
|
Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
|
|
std::string fpath(bfile->PathName());
|
|
std::unique_ptr<WritableFile> wfile;
|
|
|
|
Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_);
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to open blob file for write: %s status: '%s'"
|
|
" exists: '%s'",
|
|
fpath.c_str(), s.ToString().c_str(),
|
|
env_->FileExists(fpath).ToString().c_str());
|
|
return s;
|
|
}
|
|
|
|
std::unique_ptr<WritableFileWriter> fwriter;
|
|
fwriter.reset(new WritableFileWriter(std::move(wfile), env_options_));
|
|
|
|
uint64_t boffset = bfile->GetFileSize();
|
|
if (debug_level_ >= 2 && boffset) {
|
|
ROCKS_LOG_DEBUG(db_options_.info_log, "Open blob file: %s with offset: %d",
|
|
fpath.c_str(), boffset);
|
|
}
|
|
|
|
Writer::ElemType et = Writer::kEtNone;
|
|
if (bfile->file_size_ == BlobLogHeader::kSize) {
|
|
et = Writer::kEtFileHdr;
|
|
} else if (bfile->file_size_ > BlobLogHeader::kSize) {
|
|
et = Writer::kEtRecord;
|
|
} else if (bfile->file_size_) {
|
|
ROCKS_LOG_WARN(db_options_.info_log,
|
|
"Open blob file: %s with wrong size: %d", fpath.c_str(),
|
|
boffset);
|
|
return Status::Corruption("Invalid blob file size");
|
|
}
|
|
|
|
bfile->log_writer_ = std::make_shared<Writer>(
|
|
std::move(fwriter), env_, statistics_, bfile->file_number_,
|
|
bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
|
|
bfile->log_writer_->last_elem_type_ = et;
|
|
|
|
return s;
|
|
}
|
|
|
|
std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
|
|
uint64_t expiration) const {
|
|
if (open_ttl_files_.empty()) return nullptr;
|
|
|
|
std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
|
|
tmp->expiration_range_ = std::make_pair(expiration, 0);
|
|
|
|
auto citr = open_ttl_files_.equal_range(tmp);
|
|
if (citr.first == open_ttl_files_.end()) {
|
|
assert(citr.second == open_ttl_files_.end());
|
|
|
|
std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
|
|
return (check->expiration_range_.second < expiration) ? nullptr : check;
|
|
}
|
|
|
|
if (citr.first != citr.second) return *(citr.first);
|
|
|
|
auto finditr = citr.second;
|
|
if (finditr != open_ttl_files_.begin()) --finditr;
|
|
|
|
bool b2 = (*finditr)->expiration_range_.second < expiration;
|
|
bool b1 = (*finditr)->expiration_range_.first > expiration;
|
|
|
|
return (b1 || b2) ? nullptr : (*finditr);
|
|
}
|
|
|
|
std::shared_ptr<Writer> BlobDBImpl::CheckOrCreateWriterLocked(
|
|
const std::shared_ptr<BlobFile>& bfile) {
|
|
std::shared_ptr<Writer> writer = bfile->GetWriter();
|
|
if (writer) return writer;
|
|
|
|
Status s = CreateWriterLocked(bfile);
|
|
if (!s.ok()) return nullptr;
|
|
|
|
writer = bfile->GetWriter();
|
|
return writer;
|
|
}
|
|
|
|
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
|
|
{
|
|
ReadLock rl(&mutex_);
|
|
if (open_non_ttl_file_ != nullptr) {
|
|
return open_non_ttl_file_;
|
|
}
|
|
}
|
|
|
|
// CHECK again
|
|
WriteLock wl(&mutex_);
|
|
if (open_non_ttl_file_ != nullptr) {
|
|
return open_non_ttl_file_;
|
|
}
|
|
|
|
std::shared_ptr<BlobFile> bfile = NewBlobFile("SelectBlobFile");
|
|
assert(bfile);
|
|
|
|
// file not visible, hence no lock
|
|
std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
|
|
if (!writer) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to get writer from blob file: %s",
|
|
bfile->PathName().c_str());
|
|
return nullptr;
|
|
}
|
|
|
|
bfile->file_size_ = BlobLogHeader::kSize;
|
|
bfile->header_.compression = bdb_options_.compression;
|
|
bfile->header_.has_ttl = false;
|
|
bfile->header_.column_family_id =
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
|
bfile->header_valid_ = true;
|
|
bfile->SetColumnFamilyId(bfile->header_.column_family_id);
|
|
bfile->SetHasTTL(false);
|
|
bfile->SetCompression(bdb_options_.compression);
|
|
|
|
Status s = writer->WriteHeader(bfile->header_);
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to write header to new blob file: %s"
|
|
" status: '%s'",
|
|
bfile->PathName().c_str(), s.ToString().c_str());
|
|
return nullptr;
|
|
}
|
|
|
|
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
|
|
open_non_ttl_file_ = bfile;
|
|
return bfile;
|
|
}
|
|
|
|
std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
|
|
assert(expiration != kNoExpiration);
|
|
uint64_t epoch_read = 0;
|
|
std::shared_ptr<BlobFile> bfile;
|
|
{
|
|
ReadLock rl(&mutex_);
|
|
bfile = FindBlobFileLocked(expiration);
|
|
epoch_read = epoch_of_.load();
|
|
}
|
|
|
|
if (bfile) {
|
|
assert(!bfile->Immutable());
|
|
return bfile;
|
|
}
|
|
|
|
uint64_t exp_low =
|
|
(expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
|
|
uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
|
|
ExpirationRange expiration_range = std::make_pair(exp_low, exp_high);
|
|
|
|
bfile = NewBlobFile("SelectBlobFileTTL");
|
|
assert(bfile);
|
|
|
|
ROCKS_LOG_INFO(db_options_.info_log, "New blob file TTL range: %s %d %d",
|
|
bfile->PathName().c_str(), exp_low, exp_high);
|
|
LogFlush(db_options_.info_log);
|
|
|
|
// we don't need to take lock as no other thread is seeing bfile yet
|
|
std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
|
|
if (!writer) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to get writer from blob file with TTL: %s",
|
|
bfile->PathName().c_str());
|
|
return nullptr;
|
|
}
|
|
|
|
bfile->header_.expiration_range = expiration_range;
|
|
bfile->header_.compression = bdb_options_.compression;
|
|
bfile->header_.has_ttl = true;
|
|
bfile->header_.column_family_id =
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
|
;
|
|
bfile->header_valid_ = true;
|
|
bfile->SetColumnFamilyId(bfile->header_.column_family_id);
|
|
bfile->SetHasTTL(true);
|
|
bfile->SetCompression(bdb_options_.compression);
|
|
bfile->file_size_ = BlobLogHeader::kSize;
|
|
|
|
// set the first value of the range, since that is
|
|
// concrete at this time. also necessary to add to open_ttl_files_
|
|
bfile->expiration_range_ = expiration_range;
|
|
|
|
WriteLock wl(&mutex_);
|
|
// in case the epoch has shifted in the interim, then check
|
|
// check condition again - should be rare.
|
|
if (epoch_of_.load() != epoch_read) {
|
|
auto bfile2 = FindBlobFileLocked(expiration);
|
|
if (bfile2) return bfile2;
|
|
}
|
|
|
|
Status s = writer->WriteHeader(bfile->header_);
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to write header to new blob file: %s"
|
|
" status: '%s'",
|
|
bfile->PathName().c_str(), s.ToString().c_str());
|
|
return nullptr;
|
|
}
|
|
|
|
blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
|
|
open_ttl_files_.insert(bfile);
|
|
epoch_of_++;
|
|
|
|
return bfile;
|
|
}
|
|
|
|
class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
|
|
private:
|
|
const WriteOptions& options_;
|
|
BlobDBImpl* blob_db_impl_;
|
|
uint32_t default_cf_id_;
|
|
WriteBatch batch_;
|
|
|
|
public:
|
|
BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
|
|
uint32_t default_cf_id)
|
|
: options_(options),
|
|
blob_db_impl_(blob_db_impl),
|
|
default_cf_id_(default_cf_id) {}
|
|
|
|
WriteBatch* batch() { return &batch_; }
|
|
|
|
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
|
|
const Slice& value) override {
|
|
if (column_family_id != default_cf_id_) {
|
|
return Status::NotSupported(
|
|
"Blob DB doesn't support non-default column family.");
|
|
}
|
|
std::string new_value;
|
|
Slice value_slice;
|
|
uint64_t expiration =
|
|
blob_db_impl_->ExtractExpiration(key, value, &value_slice, &new_value);
|
|
Status s = blob_db_impl_->PutBlobValue(options_, key, value_slice,
|
|
expiration, &batch_);
|
|
return s;
|
|
}
|
|
|
|
virtual Status DeleteCF(uint32_t column_family_id,
|
|
const Slice& key) override {
|
|
if (column_family_id != default_cf_id_) {
|
|
return Status::NotSupported(
|
|
"Blob DB doesn't support non-default column family.");
|
|
}
|
|
Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
|
|
return s;
|
|
}
|
|
|
|
virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
|
|
const Slice& end_key) {
|
|
if (column_family_id != default_cf_id_) {
|
|
return Status::NotSupported(
|
|
"Blob DB doesn't support non-default column family.");
|
|
}
|
|
Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
|
|
begin_key, end_key);
|
|
return s;
|
|
}
|
|
|
|
virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
|
|
const Slice& /*key*/) override {
|
|
return Status::NotSupported("Not supported operation in blob db.");
|
|
}
|
|
|
|
virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
|
|
const Slice& /*value*/) override {
|
|
return Status::NotSupported("Not supported operation in blob db.");
|
|
}
|
|
|
|
virtual void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
|
|
};
|
|
|
|
Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
|
|
StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
|
|
RecordTick(statistics_, BLOB_DB_NUM_WRITE);
|
|
uint32_t default_cf_id =
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
|
Status s;
|
|
BlobInserter blob_inserter(options, this, default_cf_id);
|
|
{
|
|
// Release write_mutex_ before DB write to avoid race condition with
|
|
// flush begin listener, which also require write_mutex_ to sync
|
|
// blob files.
|
|
MutexLock l(&write_mutex_);
|
|
s = updates->Iterate(&blob_inserter);
|
|
}
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
return db_->Write(options, blob_inserter.batch());
|
|
}
|
|
|
|
Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
|
uint64_t* manifest_file_size,
|
|
bool flush_memtable) {
|
|
// Hold a lock in the beginning to avoid updates to base DB during the call
|
|
ReadLock rl(&mutex_);
|
|
Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
ret.reserve(ret.size() + blob_files_.size());
|
|
for (auto bfile_pair : blob_files_) {
|
|
auto blob_file = bfile_pair.second;
|
|
ret.emplace_back(blob_file->PathName());
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
|
// Hold a lock in the beginning to avoid updates to base DB during the call
|
|
ReadLock rl(&mutex_);
|
|
db_->GetLiveFilesMetaData(metadata);
|
|
for (auto bfile_pair : blob_files_) {
|
|
auto blob_file = bfile_pair.second;
|
|
LiveFileMetaData filemetadata;
|
|
filemetadata.size = blob_file->GetFileSize();
|
|
filemetadata.name = blob_file->PathName();
|
|
auto cfh =
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
|
filemetadata.column_family_name = cfh->GetName();
|
|
metadata->emplace_back(filemetadata);
|
|
}
|
|
}
|
|
|
|
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
|
|
const Slice& value) {
|
|
std::string new_value;
|
|
Slice value_slice;
|
|
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
|
|
return PutUntil(options, key, value_slice, expiration);
|
|
}
|
|
|
|
Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
|
|
const Slice& key, const Slice& value,
|
|
uint64_t ttl) {
|
|
uint64_t now = EpochNow();
|
|
uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
|
|
return PutUntil(options, key, value, expiration);
|
|
}
|
|
|
|
Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
|
|
const Slice& value, uint64_t expiration) {
|
|
StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
|
|
RecordTick(statistics_, BLOB_DB_NUM_PUT);
|
|
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
|
|
Status s;
|
|
WriteBatch batch;
|
|
{
|
|
// Release write_mutex_ before DB write to avoid race condition with
|
|
// flush begin listener, which also require write_mutex_ to sync
|
|
// blob files.
|
|
MutexLock l(&write_mutex_);
|
|
s = PutBlobValue(options, key, value, expiration, &batch);
|
|
}
|
|
if (s.ok()) {
|
|
s = db_->Write(options, &batch);
|
|
}
|
|
TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
|
|
return s;
|
|
}
|
|
|
|
Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
|
|
const Slice& key, const Slice& value,
|
|
uint64_t expiration, WriteBatch* batch) {
|
|
Status s;
|
|
std::string index_entry;
|
|
uint32_t column_family_id =
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
|
|
if (value.size() < bdb_options_.min_blob_size) {
|
|
if (expiration == kNoExpiration) {
|
|
// Put as normal value
|
|
s = batch->Put(key, value);
|
|
RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
|
|
} else {
|
|
// Inlined with TTL
|
|
BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
|
|
s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
|
|
index_entry);
|
|
RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
|
|
}
|
|
} else {
|
|
std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
|
|
? SelectBlobFileTTL(expiration)
|
|
: SelectBlobFile();
|
|
if (!bfile) {
|
|
return Status::NotFound("Blob file not found");
|
|
}
|
|
|
|
assert(bfile->compression() == bdb_options_.compression);
|
|
std::string compression_output;
|
|
Slice value_compressed = GetCompressedSlice(value, &compression_output);
|
|
|
|
std::string headerbuf;
|
|
Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
|
|
|
|
s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration,
|
|
&index_entry);
|
|
if (expiration == kNoExpiration) {
|
|
RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
|
|
} else {
|
|
RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
|
|
}
|
|
|
|
if (s.ok()) {
|
|
if (expiration != kNoExpiration) {
|
|
bfile->ExtendExpirationRange(expiration);
|
|
}
|
|
s = CloseBlobFileIfNeeded(bfile);
|
|
if (s.ok()) {
|
|
s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
|
|
index_entry);
|
|
}
|
|
} else {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to append blob to FILE: %s: KEY: %s VALSZ: %d"
|
|
" status: '%s' blob_file: '%s'",
|
|
bfile->PathName().c_str(), key.ToString().c_str(),
|
|
value.size(), s.ToString().c_str(),
|
|
bfile->DumpState().c_str());
|
|
}
|
|
}
|
|
|
|
RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
|
|
RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
|
|
MeasureTime(statistics_, BLOB_DB_KEY_SIZE, key.size());
|
|
MeasureTime(statistics_, BLOB_DB_VALUE_SIZE, value.size());
|
|
|
|
return s;
|
|
}
|
|
|
|
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
|
|
std::string* compression_output) const {
|
|
if (bdb_options_.compression == kNoCompression) {
|
|
return raw;
|
|
}
|
|
StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
|
|
CompressionType ct = bdb_options_.compression;
|
|
CompressionOptions compression_opts;
|
|
CompressBlock(raw, compression_opts, &ct, kBlockBasedTableVersionFormat,
|
|
Slice(), compression_output);
|
|
return *compression_output;
|
|
}
|
|
|
|
uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
|
|
Slice* value_slice,
|
|
std::string* new_value) {
|
|
uint64_t expiration = kNoExpiration;
|
|
bool has_expiration = false;
|
|
bool value_changed = false;
|
|
if (ttl_extractor_ != nullptr) {
|
|
has_expiration = ttl_extractor_->ExtractExpiration(
|
|
key, value, EpochNow(), &expiration, new_value, &value_changed);
|
|
}
|
|
*value_slice = value_changed ? Slice(*new_value) : value;
|
|
return has_expiration ? expiration : kNoExpiration;
|
|
}
|
|
|
|
std::shared_ptr<BlobFile> BlobDBImpl::GetOldestBlobFile() {
|
|
std::vector<std::shared_ptr<BlobFile>> blob_files;
|
|
CopyBlobFiles(&blob_files, [](const std::shared_ptr<BlobFile>& f) {
|
|
return !f->Obsolete() && f->Immutable();
|
|
});
|
|
if (blob_files.empty()) {
|
|
return nullptr;
|
|
}
|
|
blobf_compare_ttl compare;
|
|
return *std::min_element(blob_files.begin(), blob_files.end(), compare);
|
|
}
|
|
|
|
bool BlobDBImpl::EvictOldestBlobFile() {
|
|
auto oldest_file = GetOldestBlobFile();
|
|
if (oldest_file == nullptr) {
|
|
return false;
|
|
}
|
|
|
|
WriteLock wl(&mutex_);
|
|
// Double check the file is not obsolete by others
|
|
if (oldest_file_evicted_ == false && !oldest_file->Obsolete()) {
|
|
auto expiration_range = oldest_file->GetExpirationRange();
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"Evict oldest blob file since DB out of space. Current "
|
|
"space used: %" PRIu64 ", blob dir size: %" PRIu64
|
|
", evicted blob file #%" PRIu64
|
|
" with expiration range (%" PRIu64 ", %" PRIu64 ").",
|
|
total_blob_space_.load(), bdb_options_.blob_dir_size,
|
|
oldest_file->BlobFileNumber(), expiration_range.first,
|
|
expiration_range.second);
|
|
oldest_file->MarkObsolete(GetLatestSequenceNumber());
|
|
obsolete_files_.push_back(oldest_file);
|
|
oldest_file_evicted_.store(true);
|
|
RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
|
|
RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
|
|
oldest_file->BlobCount());
|
|
RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
|
|
oldest_file->GetFileSize());
|
|
TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
Status BlobDBImpl::CheckSize(size_t blob_size) {
|
|
uint64_t new_space_util = total_blob_space_.load() + blob_size;
|
|
if (bdb_options_.blob_dir_size > 0) {
|
|
if (!bdb_options_.is_fifo &&
|
|
(new_space_util > bdb_options_.blob_dir_size)) {
|
|
return Status::NoSpace(
|
|
"Write failed, as writing it would exceed blob_dir_size limit.");
|
|
}
|
|
if (bdb_options_.is_fifo && !oldest_file_evicted_.load() &&
|
|
(new_space_util >
|
|
kEvictOldestFileAtSize * bdb_options_.blob_dir_size)) {
|
|
EvictOldestBlobFile();
|
|
}
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
|
const std::string& headerbuf, const Slice& key,
|
|
const Slice& value, uint64_t expiration,
|
|
std::string* index_entry) {
|
|
auto size_put = BlobLogRecord::kHeaderSize + key.size() + value.size();
|
|
Status s = CheckSize(size_put);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
uint64_t blob_offset = 0;
|
|
uint64_t key_offset = 0;
|
|
{
|
|
WriteLock lockbfile_w(&bfile->mutex_);
|
|
std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
|
|
if (!writer) return Status::IOError("Failed to create blob writer");
|
|
|
|
// write the blob to the blob log.
|
|
s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
|
|
&blob_offset);
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Invalid status in AppendBlob: %s status: '%s'",
|
|
bfile->PathName().c_str(), s.ToString().c_str());
|
|
return s;
|
|
}
|
|
|
|
// increment blob count
|
|
bfile->blob_count_++;
|
|
|
|
bfile->file_size_ += size_put;
|
|
total_blob_space_ += size_put;
|
|
|
|
if (expiration == kNoExpiration) {
|
|
BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
|
|
value.size(), bdb_options_.compression);
|
|
} else {
|
|
BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
|
|
blob_offset, value.size(),
|
|
bdb_options_.compression);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
std::vector<Status> BlobDBImpl::MultiGet(
|
|
const ReadOptions& read_options,
|
|
const std::vector<Slice>& keys, std::vector<std::string>* values) {
|
|
StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS);
|
|
RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
|
|
// Get a snapshot to avoid blob file get deleted between we
|
|
// fetch and index entry and reading from the file.
|
|
ReadOptions ro(read_options);
|
|
bool snapshot_created = SetSnapshotIfNeeded(&ro);
|
|
|
|
std::vector<Status> statuses;
|
|
statuses.reserve(keys.size());
|
|
values->clear();
|
|
values->reserve(keys.size());
|
|
PinnableSlice value;
|
|
for (size_t i = 0; i < keys.size(); i++) {
|
|
statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
|
|
values->push_back(value.ToString());
|
|
value.Reset();
|
|
}
|
|
if (snapshot_created) {
|
|
db_->ReleaseSnapshot(ro.snapshot);
|
|
}
|
|
return statuses;
|
|
}
|
|
|
|
bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
|
|
assert(read_options != nullptr);
|
|
if (read_options->snapshot != nullptr) {
|
|
return false;
|
|
}
|
|
read_options->snapshot = db_->GetSnapshot();
|
|
return true;
|
|
}
|
|
|
|
Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
|
|
PinnableSlice* value) {
|
|
assert(value != nullptr);
|
|
BlobIndex blob_index;
|
|
Status s = blob_index.DecodeFrom(index_entry);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
|
|
return Status::NotFound("Key expired");
|
|
}
|
|
if (blob_index.IsInlined()) {
|
|
// TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
|
|
// memory buffer to avoid extra copy.
|
|
value->PinSelf(blob_index.value());
|
|
return Status::OK();
|
|
}
|
|
if (blob_index.size() == 0) {
|
|
value->PinSelf("");
|
|
return Status::OK();
|
|
}
|
|
|
|
// offset has to have certain min, as we will read CRC
|
|
// later from the Blob Header, which needs to be also a
|
|
// valid offset.
|
|
if (blob_index.offset() <
|
|
(BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
|
|
if (debug_level_ >= 2) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Invalid blob index file_number: %" PRIu64
|
|
" blob_offset: %" PRIu64 " blob_size: %" PRIu64
|
|
" key: %s",
|
|
blob_index.file_number(), blob_index.offset(),
|
|
blob_index.size(), key.data());
|
|
}
|
|
return Status::NotFound("Invalid blob offset");
|
|
}
|
|
|
|
std::shared_ptr<BlobFile> bfile;
|
|
{
|
|
ReadLock rl(&mutex_);
|
|
auto hitr = blob_files_.find(blob_index.file_number());
|
|
|
|
// file was deleted
|
|
if (hitr == blob_files_.end()) {
|
|
return Status::NotFound("Blob Not Found as blob file missing");
|
|
}
|
|
|
|
bfile = hitr->second;
|
|
}
|
|
|
|
if (blob_index.size() == 0 && value != nullptr) {
|
|
value->PinSelf("");
|
|
return Status::OK();
|
|
}
|
|
|
|
// takes locks when called
|
|
std::shared_ptr<RandomAccessFileReader> reader =
|
|
GetOrOpenRandomAccessReader(bfile, env_, env_options_);
|
|
|
|
assert(blob_index.offset() > key.size() + sizeof(uint32_t));
|
|
uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);
|
|
uint64_t record_size = sizeof(uint32_t) + key.size() + blob_index.size();
|
|
|
|
// Allocate the buffer. This is safe in C++11
|
|
std::string buffer_str(record_size, static_cast<char>(0));
|
|
char* buffer = &buffer_str[0];
|
|
|
|
// A partial blob record contain checksum, key and value.
|
|
Slice blob_record;
|
|
{
|
|
StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
|
|
s = reader->Read(record_offset, record_size, &blob_record, buffer);
|
|
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
|
|
}
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_DEBUG(db_options_.info_log,
|
|
"Failed to read blob from blob file %" PRIu64
|
|
", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
|
|
", key_size: " PRIu64 ", read " PRIu64
|
|
"bytes, status: '%s'",
|
|
bfile->BlobFileNumber(), blob_index.offset(),
|
|
blob_index.size(), key.size(), s.ToString().c_str());
|
|
return s;
|
|
}
|
|
if (blob_record.size() != record_size) {
|
|
ROCKS_LOG_DEBUG(db_options_.info_log,
|
|
"Failed to read blob from blob file %" PRIu64
|
|
", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
|
|
", key_size: " PRIu64 ", read " PRIu64
|
|
"bytes, status: '%s'",
|
|
bfile->BlobFileNumber(), blob_index.offset(),
|
|
blob_index.size(), key.size(), s.ToString().c_str());
|
|
|
|
return Status::Corruption("Failed to retrieve blob from blob index.");
|
|
}
|
|
Slice crc_slice(blob_record.data(), sizeof(uint32_t));
|
|
Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
|
|
blob_index.size());
|
|
uint32_t crc_exp;
|
|
if (!GetFixed32(&crc_slice, &crc_exp)) {
|
|
ROCKS_LOG_DEBUG(db_options_.info_log,
|
|
"Unable to decode CRC from blob file %" PRIu64
|
|
", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
|
|
", key size: %" PRIu64 ", status: '%s'",
|
|
bfile->BlobFileNumber(), blob_index.offset(),
|
|
blob_index.size(), key.size(), s.ToString().c_str());
|
|
return Status::Corruption("Unable to decode checksum.");
|
|
}
|
|
|
|
uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
|
|
blob_record.size() - sizeof(uint32_t));
|
|
crc = crc32c::Mask(crc); // Adjust for storage
|
|
if (crc != crc_exp) {
|
|
if (debug_level_ >= 2) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Blob crc mismatch file: %s blob_offset: %" PRIu64
|
|
" blob_size: %" PRIu64 " key: %s status: '%s'",
|
|
bfile->PathName().c_str(), blob_index.offset(),
|
|
blob_index.size(), key.data(), s.ToString().c_str());
|
|
}
|
|
return Status::Corruption("Corruption. Blob CRC mismatch");
|
|
}
|
|
|
|
if (bfile->compression() == kNoCompression) {
|
|
value->PinSelf(blob_value);
|
|
} else {
|
|
BlockContents contents;
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
|
|
{
|
|
StopWatch decompression_sw(env_, statistics_,
|
|
BLOB_DB_DECOMPRESSION_MICROS);
|
|
s = UncompressBlockContentsForCompressionType(
|
|
blob_value.data(), blob_value.size(), &contents,
|
|
kBlockBasedTableVersionFormat, Slice(), bfile->compression(),
|
|
*(cfh->cfd()->ioptions()));
|
|
}
|
|
value->PinSelf(contents.data);
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status BlobDBImpl::Get(const ReadOptions& read_options,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
PinnableSlice* value) {
|
|
StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
|
|
RecordTick(statistics_, BLOB_DB_NUM_GET);
|
|
return GetImpl(read_options, column_family, key, value);
|
|
}
|
|
|
|
Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
PinnableSlice* value) {
|
|
if (column_family != DefaultColumnFamily()) {
|
|
return Status::NotSupported(
|
|
"Blob DB doesn't support non-default column family.");
|
|
}
|
|
// Get a snapshot to avoid blob file get deleted between we
|
|
// fetch and index entry and reading from the file.
|
|
// TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
|
|
ReadOptions ro(read_options);
|
|
bool snapshot_created = SetSnapshotIfNeeded(&ro);
|
|
|
|
Status s;
|
|
bool is_blob_index = false;
|
|
s = db_impl_->GetImpl(ro, column_family, key, value,
|
|
nullptr /*value_found*/, nullptr /*read_callback*/,
|
|
&is_blob_index);
|
|
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
|
|
TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
|
|
if (s.ok() && is_blob_index) {
|
|
std::string index_entry = value->ToString();
|
|
value->Reset();
|
|
s = GetBlobValue(key, index_entry, value);
|
|
}
|
|
if (snapshot_created) {
|
|
db_->ReleaseSnapshot(ro.snapshot);
|
|
}
|
|
RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
|
|
RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
|
|
return s;
|
|
}
|
|
|
|
std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
|
|
if (aborted) return std::make_pair(false, -1);
|
|
|
|
ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
|
|
|
|
ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" PRIu64,
|
|
blob_files_.size());
|
|
|
|
ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" PRIu64,
|
|
open_ttl_files_.size());
|
|
|
|
for (auto bfile : open_ttl_files_) {
|
|
assert(!bfile->Immutable());
|
|
}
|
|
|
|
uint64_t epoch_now = EpochNow();
|
|
|
|
for (auto bfile_pair : blob_files_) {
|
|
auto bfile = bfile_pair.second;
|
|
ROCKS_LOG_INFO(
|
|
db_options_.info_log, "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64,
|
|
bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(),
|
|
(bfile->expiration_range_.second - epoch_now));
|
|
}
|
|
|
|
// reschedule
|
|
return std::make_pair(true, -1);
|
|
}
|
|
|
|
Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
|
|
assert(bfile != nullptr);
|
|
Status s;
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"Closing blob file %" PRIu64 ". Path: %s",
|
|
bfile->BlobFileNumber(), bfile->PathName().c_str());
|
|
{
|
|
WriteLock wl(&mutex_);
|
|
|
|
if (bfile->HasTTL()) {
|
|
size_t erased __attribute__((__unused__));
|
|
erased = open_ttl_files_.erase(bfile);
|
|
assert(erased == 1);
|
|
} else {
|
|
assert(bfile == open_non_ttl_file_);
|
|
open_non_ttl_file_ = nullptr;
|
|
}
|
|
}
|
|
|
|
if (!bfile->closed_.load()) {
|
|
WriteLock lockbfile_w(&bfile->mutex_);
|
|
s = bfile->WriteFooterAndCloseLocked();
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to close blob file %" PRIu64 "with error: %s",
|
|
bfile->BlobFileNumber(), s.ToString().c_str());
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
|
|
// atomic read
|
|
if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
|
|
return Status::OK();
|
|
}
|
|
return CloseBlobFile(bfile);
|
|
}
|
|
|
|
bool BlobDBImpl::VisibleToActiveSnapshot(
|
|
const std::shared_ptr<BlobFile>& bfile) {
|
|
assert(bfile->Obsolete());
|
|
|
|
// We check whether the oldest snapshot is no less than the last sequence
|
|
// by the time the blob file become obsolete. If so, the blob file is not
|
|
// visible to all existing snapshots.
|
|
//
|
|
// If we keep track of the earliest sequence of the keys in the blob file,
|
|
// we could instead check if there's a snapshot falls in range
|
|
// [earliest_sequence, obsolete_sequence). But doing so will make the
|
|
// implementation more complicated.
|
|
SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
|
|
SequenceNumber oldest_snapshot = 0;
|
|
{
|
|
// Need to lock DBImpl mutex before access snapshot list.
|
|
InstrumentedMutexLock l(db_impl_->mutex());
|
|
auto snapshots = db_impl_->snapshots();
|
|
if (!snapshots.empty()) {
|
|
oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
|
|
}
|
|
}
|
|
return oldest_snapshot < obsolete_sequence;
|
|
}
|
|
|
|
std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
|
|
if (aborted) return std::make_pair(false, -1);
|
|
|
|
std::vector<std::shared_ptr<BlobFile>> process_files;
|
|
{
|
|
uint64_t epoch_now = EpochNow();
|
|
|
|
ReadLock rl(&mutex_);
|
|
for (auto bfile : open_ttl_files_) {
|
|
{
|
|
ReadLock lockbfile_r(&bfile->mutex_);
|
|
|
|
if (bfile->expiration_range_.second > epoch_now) continue;
|
|
process_files.push_back(bfile);
|
|
}
|
|
}
|
|
}
|
|
|
|
for (auto bfile : process_files) {
|
|
CloseBlobFile(bfile);
|
|
}
|
|
|
|
return std::make_pair(true, -1);
|
|
}
|
|
|
|
Status BlobDBImpl::SyncBlobFiles() {
|
|
MutexLock l(&write_mutex_);
|
|
|
|
std::vector<std::shared_ptr<BlobFile>> process_files;
|
|
{
|
|
ReadLock rl(&mutex_);
|
|
for (auto fitr : open_ttl_files_) {
|
|
process_files.push_back(fitr);
|
|
}
|
|
if (open_non_ttl_file_ != nullptr) {
|
|
process_files.push_back(open_non_ttl_file_);
|
|
}
|
|
}
|
|
|
|
Status s;
|
|
for (auto& blob_file : process_files) {
|
|
s = blob_file->Fsync();
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to sync blob file %" PRIu64 ", status: %s",
|
|
blob_file->BlobFileNumber(), s.ToString().c_str());
|
|
return s;
|
|
}
|
|
}
|
|
|
|
s = dir_ent_->Fsync();
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failed to sync blob directory, status: %s",
|
|
s.ToString().c_str());
|
|
}
|
|
return s;
|
|
}
|
|
|
|
std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
|
|
if (aborted) return std::make_pair(false, -1);
|
|
|
|
if (open_file_count_.load() < kOpenFilesTrigger) {
|
|
return std::make_pair(true, -1);
|
|
}
|
|
|
|
// in the future, we should sort by last_access_
|
|
// instead of closing every file
|
|
ReadLock rl(&mutex_);
|
|
for (auto const& ent : blob_files_) {
|
|
auto bfile = ent.second;
|
|
if (bfile->last_access_.load() == -1) continue;
|
|
|
|
WriteLock lockbfile_w(&bfile->mutex_);
|
|
CloseRandomAccessLocked(bfile);
|
|
}
|
|
|
|
return std::make_pair(true, -1);
|
|
}
|
|
|
|
// Write callback for garbage collection to check if key has been updated
|
|
// since last read. Similar to how OptimisticTransaction works. See inline
|
|
// comment in GCFileAndUpdateLSM().
|
|
class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback {
|
|
public:
|
|
GarbageCollectionWriteCallback(ColumnFamilyData* cfd, const Slice& key,
|
|
SequenceNumber upper_bound)
|
|
: cfd_(cfd), key_(key), upper_bound_(upper_bound) {}
|
|
|
|
virtual Status Callback(DB* db) override {
|
|
auto* db_impl = reinterpret_cast<DBImpl*>(db);
|
|
auto* sv = db_impl->GetAndRefSuperVersion(cfd_);
|
|
SequenceNumber latest_seq = 0;
|
|
bool found_record_for_key = false;
|
|
bool is_blob_index = false;
|
|
Status s = db_impl->GetLatestSequenceForKey(
|
|
sv, key_, false /*cache_only*/, &latest_seq, &found_record_for_key,
|
|
&is_blob_index);
|
|
db_impl->ReturnAndCleanupSuperVersion(cfd_, sv);
|
|
if (!s.ok() && !s.IsNotFound()) {
|
|
// Error.
|
|
assert(!s.IsBusy());
|
|
return s;
|
|
}
|
|
if (s.IsNotFound()) {
|
|
assert(!found_record_for_key);
|
|
return Status::Busy("Key deleted");
|
|
}
|
|
assert(found_record_for_key);
|
|
assert(is_blob_index);
|
|
if (latest_seq > upper_bound_) {
|
|
return Status::Busy("Key overwritten");
|
|
}
|
|
return s;
|
|
}
|
|
|
|
virtual bool AllowWriteBatching() override { return false; }
|
|
|
|
private:
|
|
ColumnFamilyData* cfd_;
|
|
// Key to check
|
|
Slice key_;
|
|
// Upper bound of sequence number to proceed.
|
|
SequenceNumber upper_bound_;
|
|
};
|
|
|
|
// iterate over the blobs sequentially and check if the blob sequence number
|
|
// is the latest. If it is the latest, preserve it, otherwise delete it
|
|
// if it is TTL based, and the TTL has expired, then
|
|
// we can blow the entity if the key is still the latest or the Key is not
|
|
// found
|
|
// WHAT HAPPENS IF THE KEY HAS BEEN OVERRIDEN. Then we can drop the blob
|
|
// without doing anything if the earliest snapshot is not
|
|
// referring to that sequence number, i.e. it is later than the sequence number
|
|
// of the new key
|
|
//
|
|
// if it is not TTL based, then we can blow the key if the key has been
|
|
// DELETED in the LSM
|
|
Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|
GCStats* gc_stats) {
|
|
StopWatch gc_sw(env_, statistics_, BLOB_DB_GC_MICROS);
|
|
uint64_t now = EpochNow();
|
|
|
|
std::shared_ptr<Reader> reader =
|
|
bfptr->OpenSequentialReader(env_, db_options_, env_options_);
|
|
if (!reader) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"File sequential reader could not be opened",
|
|
bfptr->PathName().c_str());
|
|
return Status::IOError("failed to create sequential reader");
|
|
}
|
|
|
|
BlobLogHeader header;
|
|
Status s = reader->ReadHeader(&header);
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Failure to read header for blob-file %s",
|
|
bfptr->PathName().c_str());
|
|
return s;
|
|
}
|
|
|
|
auto* cfh =
|
|
db_impl_->GetColumnFamilyHandleUnlocked(bfptr->column_family_id());
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
|
|
auto column_family_id = cfd->GetID();
|
|
bool has_ttl = header.has_ttl;
|
|
|
|
// this reads the key but skips the blob
|
|
Reader::ReadLevel shallow = Reader::kReadHeaderKey;
|
|
|
|
bool file_expired = has_ttl && now >= bfptr->GetExpirationRange().second;
|
|
|
|
if (!file_expired) {
|
|
// read the blob because you have to write it back to new file
|
|
shallow = Reader::kReadHeaderKeyBlob;
|
|
}
|
|
|
|
BlobLogRecord record;
|
|
std::shared_ptr<BlobFile> newfile;
|
|
std::shared_ptr<Writer> new_writer;
|
|
uint64_t blob_offset = 0;
|
|
|
|
while (true) {
|
|
assert(s.ok());
|
|
|
|
// Read the next blob record.
|
|
Status read_record_status =
|
|
reader->ReadRecord(&record, shallow, &blob_offset);
|
|
// Exit if we reach the end of blob file.
|
|
// TODO(yiwu): properly handle ReadRecord error.
|
|
if (!read_record_status.ok()) {
|
|
break;
|
|
}
|
|
gc_stats->blob_count++;
|
|
|
|
// Similar to OptimisticTransaction, we obtain latest_seq from
|
|
// base DB, which is guaranteed to be no smaller than the sequence of
|
|
// current key. We use a WriteCallback on write to check the key sequence
|
|
// on write. If the key sequence is larger than latest_seq, we know
|
|
// a new versions is inserted and the old blob can be disgard.
|
|
//
|
|
// We cannot use OptimisticTransaction because we need to pass
|
|
// is_blob_index flag to GetImpl.
|
|
SequenceNumber latest_seq = GetLatestSequenceNumber();
|
|
bool is_blob_index = false;
|
|
PinnableSlice index_entry;
|
|
Status get_status = db_impl_->GetImpl(
|
|
ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/,
|
|
nullptr /*read_callback*/, &is_blob_index);
|
|
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB");
|
|
if (!get_status.ok() && !get_status.IsNotFound()) {
|
|
// error
|
|
s = get_status;
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Error while getting index entry: %s",
|
|
s.ToString().c_str());
|
|
break;
|
|
}
|
|
if (get_status.IsNotFound() || !is_blob_index) {
|
|
// Either the key is deleted or updated with a newer version whish is
|
|
// inlined in LSM.
|
|
gc_stats->num_keys_overwritten++;
|
|
gc_stats->bytes_overwritten += record.record_size();
|
|
continue;
|
|
}
|
|
|
|
BlobIndex blob_index;
|
|
s = blob_index.DecodeFrom(index_entry);
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Error while decoding index entry: %s",
|
|
s.ToString().c_str());
|
|
break;
|
|
}
|
|
if (blob_index.IsInlined() ||
|
|
blob_index.file_number() != bfptr->BlobFileNumber() ||
|
|
blob_index.offset() != blob_offset) {
|
|
// Key has been overwritten. Drop the blob record.
|
|
gc_stats->num_keys_overwritten++;
|
|
gc_stats->bytes_overwritten += record.record_size();
|
|
continue;
|
|
}
|
|
|
|
GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq);
|
|
|
|
// If key has expired, remove it from base DB.
|
|
// TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter.
|
|
// We can just drop the blob record.
|
|
if (file_expired || (has_ttl && now >= record.expiration)) {
|
|
gc_stats->num_keys_expired++;
|
|
gc_stats->bytes_expired += record.record_size();
|
|
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete");
|
|
WriteBatch delete_batch;
|
|
Status delete_status = delete_batch.Delete(record.key);
|
|
if (delete_status.ok()) {
|
|
delete_status = db_impl_->WriteWithCallback(WriteOptions(),
|
|
&delete_batch, &callback);
|
|
}
|
|
if (!delete_status.ok() && !delete_status.IsBusy()) {
|
|
// We hit an error.
|
|
s = delete_status;
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"Error while deleting expired key: %s",
|
|
s.ToString().c_str());
|
|
break;
|
|
}
|
|
// Continue to next blob record or retry.
|
|
continue;
|
|
}
|
|
|
|
// Relocate the blob record to new file.
|
|
if (!newfile) {
|
|
// new file
|
|
std::string reason("GC of ");
|
|
reason += bfptr->PathName();
|
|
newfile = NewBlobFile(reason);
|
|
|
|
new_writer = CheckOrCreateWriterLocked(newfile);
|
|
newfile->header_ = std::move(header);
|
|
// Can't use header beyond this point
|
|
newfile->header_valid_ = true;
|
|
newfile->file_size_ = BlobLogHeader::kSize;
|
|
s = new_writer->WriteHeader(newfile->header_);
|
|
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"File: %s - header writing failed",
|
|
newfile->PathName().c_str());
|
|
break;
|
|
}
|
|
|
|
WriteLock wl(&mutex_);
|
|
|
|
blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
|
|
}
|
|
|
|
std::string new_index_entry;
|
|
uint64_t new_blob_offset = 0;
|
|
uint64_t new_key_offset = 0;
|
|
// write the blob to the blob log.
|
|
s = new_writer->AddRecord(record.key, record.value, record.expiration,
|
|
&new_key_offset, &new_blob_offset);
|
|
|
|
BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(),
|
|
new_blob_offset, record.value.size(),
|
|
bdb_options_.compression);
|
|
|
|
newfile->blob_count_++;
|
|
newfile->file_size_ +=
|
|
BlobLogRecord::kHeaderSize + record.key.size() + record.value.size();
|
|
|
|
TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
|
|
WriteBatch rewrite_batch;
|
|
Status rewrite_status = WriteBatchInternal::PutBlobIndex(
|
|
&rewrite_batch, column_family_id, record.key, new_index_entry);
|
|
if (rewrite_status.ok()) {
|
|
rewrite_status = db_impl_->WriteWithCallback(WriteOptions(),
|
|
&rewrite_batch, &callback);
|
|
}
|
|
if (rewrite_status.ok()) {
|
|
gc_stats->num_keys_relocated++;
|
|
gc_stats->bytes_relocated += record.record_size();
|
|
} else if (rewrite_status.IsBusy()) {
|
|
// The key is overwritten in the meanwhile. Drop the blob record.
|
|
gc_stats->num_keys_overwritten++;
|
|
gc_stats->bytes_overwritten += record.record_size();
|
|
} else {
|
|
// We hit an error.
|
|
s = rewrite_status;
|
|
ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s",
|
|
s.ToString().c_str());
|
|
break;
|
|
}
|
|
} // end of ReadRecord loop
|
|
|
|
if (s.ok()) {
|
|
bfptr->MarkObsolete(GetLatestSequenceNumber());
|
|
{
|
|
WriteLock wl(&mutex_);
|
|
obsolete_files_.push_back(bfptr);
|
|
}
|
|
}
|
|
|
|
ROCKS_LOG_INFO(
|
|
db_options_.info_log,
|
|
"%s blob file %" PRIu64 ". Total blob records: %" PRIu64
|
|
", expired: %" PRIu64 " keys/%" PRIu64
|
|
" bytes, updated or deleted by user: %" PRIu64 " keys/%" PRIu64
|
|
" bytes, rewrite to new file: %" PRIu64 " keys/%" PRIu64 " bytes.",
|
|
s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
|
|
bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->num_keys_expired,
|
|
gc_stats->bytes_expired, gc_stats->num_keys_overwritten,
|
|
gc_stats->bytes_overwritten, gc_stats->num_keys_relocated,
|
|
gc_stats->bytes_relocated);
|
|
RecordTick(statistics_, BLOB_DB_GC_NUM_FILES);
|
|
RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
|
|
gc_stats->num_keys_overwritten);
|
|
RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_EXPIRED,
|
|
gc_stats->num_keys_expired);
|
|
RecordTick(statistics_, BLOB_DB_GC_BYTES_OVERWRITTEN,
|
|
gc_stats->bytes_overwritten);
|
|
RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired);
|
|
if (newfile != nullptr) {
|
|
total_blob_space_ += newfile->file_size_;
|
|
ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".",
|
|
newfile->BlobFileNumber());
|
|
RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES);
|
|
RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
|
|
gc_stats->num_keys_relocated);
|
|
RecordTick(statistics_, BLOB_DB_GC_BYTES_RELOCATED,
|
|
gc_stats->bytes_relocated);
|
|
}
|
|
if (!s.ok()) {
|
|
RecordTick(statistics_, BLOB_DB_GC_FAILURES);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
|
|
if (aborted) return std::make_pair(false, -1);
|
|
|
|
{
|
|
ReadLock rl(&mutex_);
|
|
if (obsolete_files_.empty()) return std::make_pair(true, -1);
|
|
}
|
|
|
|
std::list<std::shared_ptr<BlobFile>> tobsolete;
|
|
{
|
|
WriteLock wl(&mutex_);
|
|
tobsolete.swap(obsolete_files_);
|
|
}
|
|
|
|
bool file_deleted = false;
|
|
for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
|
|
auto bfile = *iter;
|
|
{
|
|
ReadLock lockbfile_r(&bfile->mutex_);
|
|
if (VisibleToActiveSnapshot(bfile)) {
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"Could not delete file due to snapshot failure %s",
|
|
bfile->PathName().c_str());
|
|
++iter;
|
|
continue;
|
|
}
|
|
}
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"Will delete file due to snapshot success %s",
|
|
bfile->PathName().c_str());
|
|
|
|
blob_files_.erase(bfile->BlobFileNumber());
|
|
Status s = env_->DeleteFile(bfile->PathName());
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
|
"File failed to be deleted as obsolete %s",
|
|
bfile->PathName().c_str());
|
|
++iter;
|
|
continue;
|
|
}
|
|
|
|
file_deleted = true;
|
|
total_blob_space_ -= bfile->file_size_;
|
|
ROCKS_LOG_INFO(db_options_.info_log,
|
|
"File deleted as obsolete from blob dir %s",
|
|
bfile->PathName().c_str());
|
|
|
|
iter = tobsolete.erase(iter);
|
|
}
|
|
|
|
// directory change. Fsync
|
|
if (file_deleted) {
|
|
dir_ent_->Fsync();
|
|
|
|
// reset oldest_file_evicted flag
|
|
oldest_file_evicted_.store(false);
|
|
}
|
|
|
|
// put files back into obsolete if for some reason, delete failed
|
|
if (!tobsolete.empty()) {
|
|
WriteLock wl(&mutex_);
|
|
for (auto bfile : tobsolete) {
|
|
obsolete_files_.push_front(bfile);
|
|
}
|
|
}
|
|
|
|
return std::make_pair(!aborted, -1);
|
|
}
|
|
|
|
void BlobDBImpl::CopyBlobFiles(
|
|
std::vector<std::shared_ptr<BlobFile>>* bfiles_copy,
|
|
std::function<bool(const std::shared_ptr<BlobFile>&)> predicate) {
|
|
ReadLock rl(&mutex_);
|
|
|
|
for (auto const& p : blob_files_) {
|
|
bool pred_value = true;
|
|
if (predicate) {
|
|
pred_value = predicate(p.second);
|
|
}
|
|
if (pred_value) {
|
|
bfiles_copy->push_back(p.second);
|
|
}
|
|
}
|
|
}
|
|
|
|
std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
|
|
if (aborted) {
|
|
return std::make_pair(false, -1);
|
|
}
|
|
|
|
// TODO(yiwu): Garbage collection implementation.
|
|
|
|
// reschedule
|
|
return std::make_pair(true, -1);
|
|
}
|
|
|
|
Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
|
|
auto* cfd =
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
|
|
// Get a snapshot to avoid blob file get deleted between we
|
|
// fetch and index entry and reading from the file.
|
|
ManagedSnapshot* own_snapshot = nullptr;
|
|
const Snapshot* snapshot = read_options.snapshot;
|
|
if (snapshot == nullptr) {
|
|
own_snapshot = new ManagedSnapshot(db_);
|
|
snapshot = own_snapshot->snapshot();
|
|
}
|
|
auto* iter = db_impl_->NewIteratorImpl(
|
|
read_options, cfd, snapshot->GetSequenceNumber(),
|
|
nullptr /*read_callback*/, true /*allow_blob*/);
|
|
return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_);
|
|
}
|
|
|
|
Status DestroyBlobDB(const std::string& dbname, const Options& options,
|
|
const BlobDBOptions& bdb_options) {
|
|
const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
|
|
Env* env = soptions.env;
|
|
|
|
Status status;
|
|
std::string blobdir;
|
|
blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
|
|
: bdb_options.blob_dir;
|
|
|
|
std::vector<std::string> filenames;
|
|
env->GetChildren(blobdir, &filenames);
|
|
|
|
for (const auto& f : filenames) {
|
|
uint64_t number;
|
|
FileType type;
|
|
if (ParseFileName(f, &number, &type) && type == kBlobFile) {
|
|
Status del = env->DeleteFile(blobdir + "/" + f);
|
|
if (status.ok() && !del.ok()) {
|
|
status = del;
|
|
}
|
|
}
|
|
}
|
|
env->DeleteDir(blobdir);
|
|
|
|
Status destroy = DestroyDB(dbname, options);
|
|
if (status.ok() && !destroy.ok()) {
|
|
status = destroy;
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
|
|
PinnableSlice* value) {
|
|
return GetBlobValue(key, index_entry, value);
|
|
}
|
|
|
|
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
|
|
ReadLock l(&mutex_);
|
|
std::vector<std::shared_ptr<BlobFile>> blob_files;
|
|
for (auto& p : blob_files_) {
|
|
blob_files.emplace_back(p.second);
|
|
}
|
|
return blob_files;
|
|
}
|
|
|
|
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
|
|
const {
|
|
ReadLock l(&mutex_);
|
|
std::vector<std::shared_ptr<BlobFile>> obsolete_files;
|
|
for (auto& bfile : obsolete_files_) {
|
|
obsolete_files.emplace_back(bfile);
|
|
}
|
|
return obsolete_files;
|
|
}
|
|
|
|
void BlobDBImpl::TEST_DeleteObsoleteFiles() {
|
|
DeleteObsoleteFiles(false /*abort*/);
|
|
}
|
|
|
|
Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
|
|
return CloseBlobFile(bfile);
|
|
}
|
|
|
|
Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
|
|
GCStats* gc_stats) {
|
|
return GCFileAndUpdateLSM(bfile, gc_stats);
|
|
}
|
|
|
|
void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
|
|
#endif // !NDEBUG
|
|
|
|
} // namespace blob_db
|
|
} // namespace rocksdb
|
|
#endif // ROCKSDB_LITE
|