mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-26 16:30:56 +00:00
5a2a6483dc
Summary: Adding the `min_blob_size` option to allow storing small values in base db (in LSM tree) together with the key. The goal is to improve performance for small values, while taking advantage of blob db's low write amplification for large values. Also adding expiration timestamp to blob index. It will be useful to evict stale blob indexes in base db by adding a compaction filter. I'll work on the compaction filter in future patches. See blob_index.h for the new blob index format. There are 4 cases when writing a new key: * small value w/o TTL: put in base db as normal value (i.e. ValueType::kTypeValue) * small value w/ TTL: put (type, expiration, value) to base db. * large value w/o TTL: write value to blob log and put (type, file, offset, size, compression) to base db. * large value w/TTL: write value to blob log and put (type, expiration, file, offset, size, compression) to base db. Closes https://github.com/facebook/rocksdb/pull/3066 Differential Revision: D6142115 Pulled By: yiwu-arbug fbshipit-source-id: 9526e76e19f0839310a3f5f2a43772a4ad182cd0
2886 lines
98 KiB
C++
2886 lines
98 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#include "db/db_impl.h"
|
|
|
|
#ifndef __STDC_FORMAT_MACROS
|
|
#define __STDC_FORMAT_MACROS
|
|
#endif
|
|
#include <stdint.h>
|
|
#ifdef OS_SOLARIS
|
|
#include <alloca.h>
|
|
#endif
|
|
|
|
#include <algorithm>
|
|
#include <cstdio>
|
|
#include <map>
|
|
#include <set>
|
|
#include <stdexcept>
|
|
#include <string>
|
|
#include <unordered_map>
|
|
#include <unordered_set>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "db/builder.h"
|
|
#include "db/compaction_job.h"
|
|
#include "db/db_info_dumper.h"
|
|
#include "db/db_iter.h"
|
|
#include "db/dbformat.h"
|
|
#include "db/event_helpers.h"
|
|
#include "db/external_sst_file_ingestion_job.h"
|
|
#include "db/flush_job.h"
|
|
#include "db/forward_iterator.h"
|
|
#include "db/job_context.h"
|
|
#include "db/log_reader.h"
|
|
#include "db/log_writer.h"
|
|
#include "db/malloc_stats.h"
|
|
#include "db/managed_iterator.h"
|
|
#include "db/memtable.h"
|
|
#include "db/memtable_list.h"
|
|
#include "db/merge_context.h"
|
|
#include "db/merge_helper.h"
|
|
#include "db/range_del_aggregator.h"
|
|
#include "db/table_cache.h"
|
|
#include "db/table_properties_collector.h"
|
|
#include "db/transaction_log_impl.h"
|
|
#include "db/version_set.h"
|
|
#include "db/write_batch_internal.h"
|
|
#include "db/write_callback.h"
|
|
#include "memtable/hash_linklist_rep.h"
|
|
#include "memtable/hash_skiplist_rep.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "monitoring/perf_context_imp.h"
|
|
#include "monitoring/thread_status_updater.h"
|
|
#include "monitoring/thread_status_util.h"
|
|
#include "options/cf_options.h"
|
|
#include "options/options_helper.h"
|
|
#include "options/options_parser.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/cache.h"
|
|
#include "rocksdb/compaction_filter.h"
|
|
#include "rocksdb/convenience.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/merge_operator.h"
|
|
#include "rocksdb/statistics.h"
|
|
#include "rocksdb/status.h"
|
|
#include "rocksdb/table.h"
|
|
#include "rocksdb/write_buffer_manager.h"
|
|
#include "table/block.h"
|
|
#include "table/block_based_table_factory.h"
|
|
#include "table/merging_iterator.h"
|
|
#include "table/table_builder.h"
|
|
#include "table/two_level_iterator.h"
|
|
#include "tools/sst_dump_tool_imp.h"
|
|
#include "util/auto_roll_logger.h"
|
|
#include "util/autovector.h"
|
|
#include "util/build_version.h"
|
|
#include "util/coding.h"
|
|
#include "util/compression.h"
|
|
#include "util/crc32c.h"
|
|
#include "util/file_reader_writer.h"
|
|
#include "util/file_util.h"
|
|
#include "util/filename.h"
|
|
#include "util/log_buffer.h"
|
|
#include "util/logging.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/sst_file_manager_impl.h"
|
|
#include "util/stop_watch.h"
|
|
#include "util/string_util.h"
|
|
#include "util/sync_point.h"
|
|
|
|
namespace rocksdb {
|
|
const std::string kDefaultColumnFamilyName("default");
|
|
void DumpRocksDBBuildVersion(Logger * log);
|
|
|
|
CompressionType GetCompressionFlush(
|
|
const ImmutableCFOptions& ioptions,
|
|
const MutableCFOptions& mutable_cf_options) {
|
|
// Compressing memtable flushes might not help unless the sequential load
|
|
// optimization is used for leveled compaction. Otherwise the CPU and
|
|
// latency overhead is not offset by saving much space.
|
|
if (ioptions.compaction_style == kCompactionStyleUniversal) {
|
|
if (ioptions.compaction_options_universal.compression_size_percent < 0) {
|
|
return mutable_cf_options.compression;
|
|
} else {
|
|
return kNoCompression;
|
|
}
|
|
} else if (!ioptions.compression_per_level.empty()) {
|
|
// For leveled compress when min_level_to_compress != 0.
|
|
return ioptions.compression_per_level[0];
|
|
} else {
|
|
return mutable_cf_options.compression;
|
|
}
|
|
}
|
|
|
|
namespace {
|
|
void DumpSupportInfo(Logger* logger) {
|
|
ROCKS_LOG_HEADER(logger, "Compression algorithms supported:");
|
|
ROCKS_LOG_HEADER(logger, "\tSnappy supported: %d", Snappy_Supported());
|
|
ROCKS_LOG_HEADER(logger, "\tZlib supported: %d", Zlib_Supported());
|
|
ROCKS_LOG_HEADER(logger, "\tBzip supported: %d", BZip2_Supported());
|
|
ROCKS_LOG_HEADER(logger, "\tLZ4 supported: %d", LZ4_Supported());
|
|
ROCKS_LOG_HEADER(logger, "\tZSTDNotFinal supported: %d",
|
|
ZSTDNotFinal_Supported());
|
|
ROCKS_LOG_HEADER(logger, "\tZSTD supported: %d", ZSTD_Supported());
|
|
ROCKS_LOG_HEADER(logger, "Fast CRC32 supported: %s",
|
|
crc32c::IsFastCrc32Supported().c_str());
|
|
}
|
|
|
|
int64_t kDefaultLowPriThrottledRate = 2 * 1024 * 1024;
|
|
} // namespace
|
|
|
|
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
|
: env_(options.env),
|
|
dbname_(dbname),
|
|
initial_db_options_(SanitizeOptions(dbname, options)),
|
|
immutable_db_options_(initial_db_options_),
|
|
mutable_db_options_(initial_db_options_),
|
|
stats_(immutable_db_options_.statistics.get()),
|
|
db_lock_(nullptr),
|
|
mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
|
|
immutable_db_options_.use_adaptive_mutex),
|
|
shutting_down_(false),
|
|
bg_cv_(&mutex_),
|
|
logfile_number_(0),
|
|
log_dir_synced_(false),
|
|
log_empty_(true),
|
|
default_cf_handle_(nullptr),
|
|
log_sync_cv_(&mutex_),
|
|
total_log_size_(0),
|
|
max_total_in_memory_state_(0),
|
|
is_snapshot_supported_(true),
|
|
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
|
|
write_thread_(immutable_db_options_),
|
|
nonmem_write_thread_(immutable_db_options_),
|
|
write_controller_(mutable_db_options_.delayed_write_rate),
|
|
// Use delayed_write_rate as a base line to determine the initial
|
|
// low pri write rate limit. It may be adjusted later.
|
|
low_pri_write_rate_limiter_(NewGenericRateLimiter(std::min(
|
|
static_cast<int64_t>(mutable_db_options_.delayed_write_rate / 8),
|
|
kDefaultLowPriThrottledRate))),
|
|
last_batch_group_size_(0),
|
|
unscheduled_flushes_(0),
|
|
unscheduled_compactions_(0),
|
|
bg_bottom_compaction_scheduled_(0),
|
|
bg_compaction_scheduled_(0),
|
|
num_running_compactions_(0),
|
|
bg_flush_scheduled_(0),
|
|
num_running_flushes_(0),
|
|
bg_purge_scheduled_(0),
|
|
disable_delete_obsolete_files_(0),
|
|
delete_obsolete_files_last_run_(env_->NowMicros()),
|
|
last_stats_dump_time_microsec_(0),
|
|
next_job_id_(1),
|
|
has_unpersisted_data_(false),
|
|
unable_to_flush_oldest_log_(false),
|
|
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
|
|
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
|
|
env_options_, immutable_db_options_)),
|
|
num_running_ingest_file_(0),
|
|
#ifndef ROCKSDB_LITE
|
|
wal_manager_(immutable_db_options_, env_options_),
|
|
#endif // ROCKSDB_LITE
|
|
event_logger_(immutable_db_options_.info_log.get()),
|
|
bg_work_paused_(0),
|
|
bg_compaction_paused_(0),
|
|
refitting_level_(false),
|
|
opened_successfully_(false),
|
|
concurrent_prepare_(options.concurrent_prepare),
|
|
manual_wal_flush_(options.manual_wal_flush),
|
|
seq_per_batch_(options.seq_per_batch),
|
|
// TODO(myabandeh): revise this when we change options.seq_per_batch
|
|
use_custom_gc_(options.seq_per_batch) {
|
|
env_->GetAbsolutePath(dbname, &db_absolute_path_);
|
|
|
|
// Reserve ten files or so for other uses and give the rest to TableCache.
|
|
// Give a large number for setting of "infinite" open files.
|
|
const int table_cache_size = (mutable_db_options_.max_open_files == -1)
|
|
? TableCache::kInfiniteCapacity
|
|
: mutable_db_options_.max_open_files - 10;
|
|
table_cache_ = NewLRUCache(table_cache_size,
|
|
immutable_db_options_.table_cache_numshardbits);
|
|
|
|
versions_.reset(new VersionSet(dbname_, &immutable_db_options_, env_options_,
|
|
table_cache_.get(), write_buffer_manager_,
|
|
&write_controller_));
|
|
column_family_memtables_.reset(
|
|
new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
|
|
|
|
DumpRocksDBBuildVersion(immutable_db_options_.info_log.get());
|
|
DumpDBFileSummary(immutable_db_options_, dbname_);
|
|
immutable_db_options_.Dump(immutable_db_options_.info_log.get());
|
|
mutable_db_options_.Dump(immutable_db_options_.info_log.get());
|
|
DumpSupportInfo(immutable_db_options_.info_log.get());
|
|
}
|
|
|
|
// Will lock the mutex_, will wait for completion if wait is true
|
|
void DBImpl::CancelAllBackgroundWork(bool wait) {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"Shutdown: canceling all background work");
|
|
|
|
if (!shutting_down_.load(std::memory_order_acquire) &&
|
|
has_unpersisted_data_.load(std::memory_order_relaxed) &&
|
|
!mutable_db_options_.avoid_flush_during_shutdown) {
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
|
|
cfd->Ref();
|
|
mutex_.Unlock();
|
|
FlushMemTable(cfd, FlushOptions());
|
|
mutex_.Lock();
|
|
cfd->Unref();
|
|
}
|
|
}
|
|
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
|
|
}
|
|
|
|
shutting_down_.store(true, std::memory_order_release);
|
|
bg_cv_.SignalAll();
|
|
if (!wait) {
|
|
return;
|
|
}
|
|
// Wait for background work to finish
|
|
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
|
|
bg_flush_scheduled_) {
|
|
bg_cv_.Wait();
|
|
}
|
|
}
|
|
|
|
DBImpl::~DBImpl() {
|
|
// CancelAllBackgroundWork called with false means we just set the shutdown
|
|
// marker. After this we do a variant of the waiting and unschedule work
|
|
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
|
|
CancelAllBackgroundWork(false);
|
|
int bottom_compactions_unscheduled =
|
|
env_->UnSchedule(this, Env::Priority::BOTTOM);
|
|
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
|
|
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
|
|
mutex_.Lock();
|
|
bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
|
|
bg_compaction_scheduled_ -= compactions_unscheduled;
|
|
bg_flush_scheduled_ -= flushes_unscheduled;
|
|
|
|
// Wait for background work to finish
|
|
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
|
|
bg_flush_scheduled_ || bg_purge_scheduled_) {
|
|
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
|
|
bg_cv_.Wait();
|
|
}
|
|
EraseThreadStatusDbInfo();
|
|
flush_scheduler_.Clear();
|
|
|
|
while (!flush_queue_.empty()) {
|
|
auto cfd = PopFirstFromFlushQueue();
|
|
if (cfd->Unref()) {
|
|
delete cfd;
|
|
}
|
|
}
|
|
while (!compaction_queue_.empty()) {
|
|
auto cfd = PopFirstFromCompactionQueue();
|
|
if (cfd->Unref()) {
|
|
delete cfd;
|
|
}
|
|
}
|
|
|
|
if (default_cf_handle_ != nullptr) {
|
|
// we need to delete handle outside of lock because it does its own locking
|
|
mutex_.Unlock();
|
|
delete default_cf_handle_;
|
|
mutex_.Lock();
|
|
}
|
|
|
|
// Clean up obsolete files due to SuperVersion release.
|
|
// (1) Need to delete to obsolete files before closing because RepairDB()
|
|
// scans all existing files in the file system and builds manifest file.
|
|
// Keeping obsolete files confuses the repair process.
|
|
// (2) Need to check if we Open()/Recover() the DB successfully before
|
|
// deleting because if VersionSet recover fails (may be due to corrupted
|
|
// manifest file), it is not able to identify live files correctly. As a
|
|
// result, all "live" files can get deleted by accident. However, corrupted
|
|
// manifest is recoverable by RepairDB().
|
|
if (opened_successfully_) {
|
|
JobContext job_context(next_job_id_.fetch_add(1));
|
|
FindObsoleteFiles(&job_context, true);
|
|
|
|
mutex_.Unlock();
|
|
// manifest number starting from 2
|
|
job_context.manifest_file_number = 1;
|
|
if (job_context.HaveSomethingToDelete()) {
|
|
PurgeObsoleteFiles(job_context);
|
|
}
|
|
job_context.Clean();
|
|
mutex_.Lock();
|
|
}
|
|
|
|
for (auto l : logs_to_free_) {
|
|
delete l;
|
|
}
|
|
for (auto& log : logs_) {
|
|
log.ClearWriter();
|
|
}
|
|
logs_.clear();
|
|
|
|
// Table cache may have table handles holding blocks from the block cache.
|
|
// We need to release them before the block cache is destroyed. The block
|
|
// cache may be destroyed inside versions_.reset(), when column family data
|
|
// list is destroyed, so leaving handles in table cache after
|
|
// versions_.reset() may cause issues.
|
|
// Here we clean all unreferenced handles in table cache.
|
|
// Now we assume all user queries have finished, so only version set itself
|
|
// can possibly hold the blocks from block cache. After releasing unreferenced
|
|
// handles here, only handles held by version set left and inside
|
|
// versions_.reset(), we will release them. There, we need to make sure every
|
|
// time a handle is released, we erase it from the cache too. By doing that,
|
|
// we can guarantee that after versions_.reset(), table cache is empty
|
|
// so the cache can be safely destroyed.
|
|
table_cache_->EraseUnRefEntries();
|
|
|
|
for (auto& txn_entry : recovered_transactions_) {
|
|
delete txn_entry.second;
|
|
}
|
|
|
|
// versions need to be destroyed before table_cache since it can hold
|
|
// references to table_cache.
|
|
versions_.reset();
|
|
mutex_.Unlock();
|
|
if (db_lock_ != nullptr) {
|
|
env_->UnlockFile(db_lock_);
|
|
}
|
|
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Shutdown complete");
|
|
LogFlush(immutable_db_options_.info_log);
|
|
}
|
|
|
|
void DBImpl::MaybeIgnoreError(Status* s) const {
|
|
if (s->ok() || immutable_db_options_.paranoid_checks) {
|
|
// No change needed
|
|
} else {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Ignoring error %s",
|
|
s->ToString().c_str());
|
|
*s = Status::OK();
|
|
}
|
|
}
|
|
|
|
const Status DBImpl::CreateArchivalDirectory() {
|
|
if (immutable_db_options_.wal_ttl_seconds > 0 ||
|
|
immutable_db_options_.wal_size_limit_mb > 0) {
|
|
std::string archivalPath = ArchivalDirectory(immutable_db_options_.wal_dir);
|
|
return env_->CreateDirIfMissing(archivalPath);
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void DBImpl::PrintStatistics() {
|
|
auto dbstats = immutable_db_options_.statistics.get();
|
|
if (dbstats) {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log, "STATISTICS:\n %s",
|
|
dbstats->ToString().c_str());
|
|
}
|
|
}
|
|
|
|
void DBImpl::MaybeDumpStats() {
|
|
mutex_.Lock();
|
|
unsigned int stats_dump_period_sec =
|
|
mutable_db_options_.stats_dump_period_sec;
|
|
mutex_.Unlock();
|
|
if (stats_dump_period_sec == 0) return;
|
|
|
|
const uint64_t now_micros = env_->NowMicros();
|
|
|
|
if (last_stats_dump_time_microsec_ + stats_dump_period_sec * 1000000 <=
|
|
now_micros) {
|
|
// Multiple threads could race in here simultaneously.
|
|
// However, the last one will update last_stats_dump_time_microsec_
|
|
// atomically. We could see more than one dump during one dump
|
|
// period in rare cases.
|
|
last_stats_dump_time_microsec_ = now_micros;
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
const DBPropertyInfo* cf_property_info =
|
|
GetPropertyInfo(DB::Properties::kCFStats);
|
|
assert(cf_property_info != nullptr);
|
|
const DBPropertyInfo* db_property_info =
|
|
GetPropertyInfo(DB::Properties::kDBStats);
|
|
assert(db_property_info != nullptr);
|
|
|
|
std::string stats;
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
default_cf_internal_stats_->GetStringProperty(
|
|
*db_property_info, DB::Properties::kDBStats, &stats);
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
if (cfd->initialized()) {
|
|
cfd->internal_stats()->GetStringProperty(
|
|
*cf_property_info, DB::Properties::kCFStatsNoFileHistogram,
|
|
&stats);
|
|
}
|
|
}
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
if (cfd->initialized()) {
|
|
cfd->internal_stats()->GetStringProperty(
|
|
*cf_property_info, DB::Properties::kCFFileHistogram, &stats);
|
|
}
|
|
}
|
|
}
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"------- DUMPING STATS -------");
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str());
|
|
if (immutable_db_options_.dump_malloc_stats) {
|
|
stats.clear();
|
|
DumpMallocStats(&stats);
|
|
if (!stats.empty()) {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"------- Malloc STATS -------");
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log, "%s", stats.c_str());
|
|
}
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
PrintStatistics();
|
|
}
|
|
}
|
|
|
|
void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
|
|
if (!job_context->logs_to_free.empty()) {
|
|
for (auto l : job_context->logs_to_free) {
|
|
AddToLogsToFreeQueue(l);
|
|
}
|
|
job_context->logs_to_free.clear();
|
|
SchedulePurge();
|
|
}
|
|
}
|
|
|
|
Directory* DBImpl::Directories::GetDataDir(size_t path_id) {
|
|
assert(path_id < data_dirs_.size());
|
|
Directory* ret_dir = data_dirs_[path_id].get();
|
|
if (ret_dir == nullptr) {
|
|
// Should use db_dir_
|
|
return db_dir_.get();
|
|
}
|
|
return ret_dir;
|
|
}
|
|
|
|
Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
|
|
const std::unordered_map<std::string, std::string>& options_map) {
|
|
#ifdef ROCKSDB_LITE
|
|
return Status::NotSupported("Not supported in ROCKSDB LITE");
|
|
#else
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
|
|
if (options_map.empty()) {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"SetOptions() on column family [%s], empty input",
|
|
cfd->GetName().c_str());
|
|
return Status::InvalidArgument("empty input");
|
|
}
|
|
|
|
MutableCFOptions new_options;
|
|
Status s;
|
|
Status persist_options_status;
|
|
WriteThread::Writer w;
|
|
SuperVersionContext sv_context(/* create_superversion */ true);
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
s = cfd->SetOptions(options_map);
|
|
if (s.ok()) {
|
|
new_options = *cfd->GetLatestMutableCFOptions();
|
|
// Append new version to recompute compaction score.
|
|
VersionEdit dummy_edit;
|
|
versions_->LogAndApply(cfd, new_options, &dummy_edit, &mutex_,
|
|
directories_.GetDbDir());
|
|
// Trigger possible flush/compactions. This has to be before we persist
|
|
// options to file, otherwise there will be a deadlock with writer
|
|
// thread.
|
|
InstallSuperVersionAndScheduleWork(cfd, &sv_context, new_options);
|
|
|
|
persist_options_status = WriteOptionsFile(
|
|
false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
|
|
}
|
|
}
|
|
sv_context.Clean();
|
|
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"SetOptions() on column family [%s], inputs:",
|
|
cfd->GetName().c_str());
|
|
for (const auto& o : options_map) {
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
|
|
o.second.c_str());
|
|
}
|
|
if (s.ok()) {
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"[%s] SetOptions() succeeded", cfd->GetName().c_str());
|
|
new_options.Dump(immutable_db_options_.info_log.get());
|
|
if (!persist_options_status.ok()) {
|
|
s = persist_options_status;
|
|
}
|
|
} else {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
|
|
cfd->GetName().c_str());
|
|
}
|
|
LogFlush(immutable_db_options_.info_log);
|
|
return s;
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
Status DBImpl::SetDBOptions(
|
|
const std::unordered_map<std::string, std::string>& options_map) {
|
|
#ifdef ROCKSDB_LITE
|
|
return Status::NotSupported("Not supported in ROCKSDB LITE");
|
|
#else
|
|
if (options_map.empty()) {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"SetDBOptions(), empty input.");
|
|
return Status::InvalidArgument("empty input");
|
|
}
|
|
|
|
MutableDBOptions new_options;
|
|
Status s;
|
|
Status persist_options_status;
|
|
bool wal_changed = false;
|
|
WriteThread::Writer w;
|
|
WriteContext write_context;
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map,
|
|
&new_options);
|
|
if (s.ok()) {
|
|
if (new_options.max_background_compactions >
|
|
mutable_db_options_.max_background_compactions) {
|
|
env_->IncBackgroundThreadsIfNeeded(
|
|
new_options.max_background_compactions, Env::Priority::LOW);
|
|
MaybeScheduleFlushOrCompaction();
|
|
}
|
|
|
|
write_controller_.set_max_delayed_write_rate(new_options.delayed_write_rate);
|
|
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
|
|
? TableCache::kInfiniteCapacity
|
|
: new_options.max_open_files - 10);
|
|
wal_changed = mutable_db_options_.wal_bytes_per_sync !=
|
|
new_options.wal_bytes_per_sync;
|
|
if (new_options.bytes_per_sync == 0) {
|
|
new_options.bytes_per_sync = 1024 * 1024;
|
|
}
|
|
mutable_db_options_ = new_options;
|
|
env_options_for_compaction_ = EnvOptions(BuildDBOptions(
|
|
immutable_db_options_,
|
|
mutable_db_options_));
|
|
env_options_for_compaction_ = env_->OptimizeForCompactionTableWrite(
|
|
env_options_for_compaction_,
|
|
immutable_db_options_);
|
|
write_thread_.EnterUnbatched(&w, &mutex_);
|
|
if (total_log_size_ > GetMaxTotalWalSize() || wal_changed) {
|
|
Status purge_wal_status = SwitchWAL(&write_context);
|
|
if (!purge_wal_status.ok()) {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"Unable to purge WAL files in SetDBOptions() -- %s",
|
|
purge_wal_status.ToString().c_str());
|
|
}
|
|
}
|
|
persist_options_status = WriteOptionsFile(
|
|
false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
|
|
write_thread_.ExitUnbatched(&w);
|
|
}
|
|
}
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions(), inputs:");
|
|
for (const auto& o : options_map) {
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s: %s\n", o.first.c_str(),
|
|
o.second.c_str());
|
|
}
|
|
if (s.ok()) {
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log, "SetDBOptions() succeeded");
|
|
new_options.Dump(immutable_db_options_.info_log.get());
|
|
if (!persist_options_status.ok()) {
|
|
if (immutable_db_options_.fail_if_options_file_error) {
|
|
s = Status::IOError(
|
|
"SetDBOptions() succeeded, but unable to persist options",
|
|
persist_options_status.ToString());
|
|
}
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"Unable to persist options in SetDBOptions() -- %s",
|
|
persist_options_status.ToString().c_str());
|
|
}
|
|
} else {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log, "SetDBOptions failed");
|
|
}
|
|
LogFlush(immutable_db_options_.info_log);
|
|
return s;
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
// return the same level if it cannot be moved
|
|
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
|
|
const MutableCFOptions& mutable_cf_options, int level) {
|
|
mutex_.AssertHeld();
|
|
const auto* vstorage = cfd->current()->storage_info();
|
|
int minimum_level = level;
|
|
for (int i = level - 1; i > 0; --i) {
|
|
// stop if level i is not empty
|
|
if (vstorage->NumLevelFiles(i) > 0) break;
|
|
// stop if level i is too small (cannot fit the level files)
|
|
if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) {
|
|
break;
|
|
}
|
|
|
|
minimum_level = i;
|
|
}
|
|
return minimum_level;
|
|
}
|
|
|
|
Status DBImpl::FlushWAL(bool sync) {
|
|
{
|
|
// We need to lock log_write_mutex_ since logs_ might change concurrently
|
|
InstrumentedMutexLock wl(&log_write_mutex_);
|
|
log::Writer* cur_log_writer = logs_.back().writer;
|
|
auto s = cur_log_writer->WriteBuffer();
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
|
|
s.ToString().c_str());
|
|
}
|
|
if (!sync) {
|
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
|
|
return s;
|
|
}
|
|
}
|
|
// sync = true
|
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
|
|
return SyncWAL();
|
|
}
|
|
|
|
Status DBImpl::SyncWAL() {
|
|
autovector<log::Writer*, 1> logs_to_sync;
|
|
bool need_log_dir_sync;
|
|
uint64_t current_log_number;
|
|
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
assert(!logs_.empty());
|
|
|
|
// This SyncWAL() call only cares about logs up to this number.
|
|
current_log_number = logfile_number_;
|
|
|
|
while (logs_.front().number <= current_log_number &&
|
|
logs_.front().getting_synced) {
|
|
log_sync_cv_.Wait();
|
|
}
|
|
// First check that logs are safe to sync in background.
|
|
for (auto it = logs_.begin();
|
|
it != logs_.end() && it->number <= current_log_number; ++it) {
|
|
if (!it->writer->file()->writable_file()->IsSyncThreadSafe()) {
|
|
return Status::NotSupported(
|
|
"SyncWAL() is not supported for this implementation of WAL file",
|
|
immutable_db_options_.allow_mmap_writes
|
|
? "try setting Options::allow_mmap_writes to false"
|
|
: Slice());
|
|
}
|
|
}
|
|
for (auto it = logs_.begin();
|
|
it != logs_.end() && it->number <= current_log_number; ++it) {
|
|
auto& log = *it;
|
|
assert(!log.getting_synced);
|
|
log.getting_synced = true;
|
|
logs_to_sync.push_back(log.writer);
|
|
}
|
|
|
|
need_log_dir_sync = !log_dir_synced_;
|
|
}
|
|
|
|
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
|
|
RecordTick(stats_, WAL_FILE_SYNCED);
|
|
Status status;
|
|
for (log::Writer* log : logs_to_sync) {
|
|
status = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
|
|
if (!status.ok()) {
|
|
break;
|
|
}
|
|
}
|
|
if (status.ok() && need_log_dir_sync) {
|
|
status = directories_.GetWalDir()->Fsync();
|
|
}
|
|
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
|
|
|
|
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
MarkLogsSynced(current_log_number, need_log_dir_sync, status);
|
|
}
|
|
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
|
|
|
|
return status;
|
|
}
|
|
|
|
void DBImpl::MarkLogsSynced(
|
|
uint64_t up_to, bool synced_dir, const Status& status) {
|
|
mutex_.AssertHeld();
|
|
if (synced_dir &&
|
|
logfile_number_ == up_to &&
|
|
status.ok()) {
|
|
log_dir_synced_ = true;
|
|
}
|
|
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
|
|
auto& log = *it;
|
|
assert(log.getting_synced);
|
|
if (status.ok() && logs_.size() > 1) {
|
|
logs_to_free_.push_back(log.ReleaseWriter());
|
|
it = logs_.erase(it);
|
|
} else {
|
|
log.getting_synced = false;
|
|
++it;
|
|
}
|
|
}
|
|
assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
|
|
(logs_.size() == 1 && !logs_[0].getting_synced));
|
|
log_sync_cv_.SignalAll();
|
|
}
|
|
|
|
SequenceNumber DBImpl::GetLatestSequenceNumber() const {
|
|
return versions_->LastSequence();
|
|
}
|
|
|
|
SequenceNumber DBImpl::IncAndFetchSequenceNumber() {
|
|
return versions_->FetchAddLastToBeWrittenSequence(1ull) + 1ull;
|
|
}
|
|
|
|
InternalIterator* DBImpl::NewInternalIterator(
|
|
Arena* arena, RangeDelAggregator* range_del_agg,
|
|
ColumnFamilyHandle* column_family) {
|
|
ColumnFamilyData* cfd;
|
|
if (column_family == nullptr) {
|
|
cfd = default_cf_handle_->cfd();
|
|
} else {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
cfd = cfh->cfd();
|
|
}
|
|
|
|
mutex_.Lock();
|
|
SuperVersion* super_version = cfd->GetSuperVersion()->Ref();
|
|
mutex_.Unlock();
|
|
ReadOptions roptions;
|
|
return NewInternalIterator(roptions, cfd, super_version, arena,
|
|
range_del_agg);
|
|
}
|
|
|
|
void DBImpl::SchedulePurge() {
|
|
mutex_.AssertHeld();
|
|
assert(opened_successfully_);
|
|
|
|
// Purge operations are put into High priority queue
|
|
bg_purge_scheduled_++;
|
|
env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr);
|
|
}
|
|
|
|
void DBImpl::BackgroundCallPurge() {
|
|
mutex_.Lock();
|
|
|
|
// We use one single loop to clear both queues so that after existing the loop
|
|
// both queues are empty. This is stricter than what is needed, but can make
|
|
// it easier for us to reason the correctness.
|
|
while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) {
|
|
if (!purge_queue_.empty()) {
|
|
auto purge_file = purge_queue_.begin();
|
|
auto fname = purge_file->fname;
|
|
auto type = purge_file->type;
|
|
auto number = purge_file->number;
|
|
auto path_id = purge_file->path_id;
|
|
auto job_id = purge_file->job_id;
|
|
purge_queue_.pop_front();
|
|
|
|
mutex_.Unlock();
|
|
DeleteObsoleteFileImpl(job_id, fname, type, number, path_id);
|
|
mutex_.Lock();
|
|
} else {
|
|
assert(!logs_to_free_queue_.empty());
|
|
log::Writer* log_writer = *(logs_to_free_queue_.begin());
|
|
logs_to_free_queue_.pop_front();
|
|
mutex_.Unlock();
|
|
delete log_writer;
|
|
mutex_.Lock();
|
|
}
|
|
}
|
|
bg_purge_scheduled_--;
|
|
|
|
bg_cv_.SignalAll();
|
|
// IMPORTANT:there should be no code after calling SignalAll. This call may
|
|
// signal the DB destructor that it's OK to proceed with destruction. In
|
|
// that case, all DB variables will be dealloacated and referencing them
|
|
// will cause trouble.
|
|
mutex_.Unlock();
|
|
}
|
|
|
|
namespace {
|
|
struct IterState {
|
|
IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version,
|
|
bool _background_purge)
|
|
: db(_db),
|
|
mu(_mu),
|
|
super_version(_super_version),
|
|
background_purge(_background_purge) {}
|
|
|
|
DBImpl* db;
|
|
InstrumentedMutex* mu;
|
|
SuperVersion* super_version;
|
|
bool background_purge;
|
|
};
|
|
|
|
static void CleanupIteratorState(void* arg1, void* arg2) {
|
|
IterState* state = reinterpret_cast<IterState*>(arg1);
|
|
|
|
if (state->super_version->Unref()) {
|
|
// Job id == 0 means that this is not our background process, but rather
|
|
// user thread
|
|
JobContext job_context(0);
|
|
|
|
state->mu->Lock();
|
|
state->super_version->Cleanup();
|
|
state->db->FindObsoleteFiles(&job_context, false, true);
|
|
if (state->background_purge) {
|
|
state->db->ScheduleBgLogWriterClose(&job_context);
|
|
}
|
|
state->mu->Unlock();
|
|
|
|
delete state->super_version;
|
|
if (job_context.HaveSomethingToDelete()) {
|
|
if (state->background_purge) {
|
|
// PurgeObsoleteFiles here does not delete files. Instead, it adds the
|
|
// files to be deleted to a job queue, and deletes it in a separate
|
|
// background thread.
|
|
state->db->PurgeObsoleteFiles(job_context, true /* schedule only */);
|
|
state->mu->Lock();
|
|
state->db->SchedulePurge();
|
|
state->mu->Unlock();
|
|
} else {
|
|
state->db->PurgeObsoleteFiles(job_context);
|
|
}
|
|
}
|
|
job_context.Clean();
|
|
}
|
|
|
|
delete state;
|
|
}
|
|
} // namespace
|
|
|
|
InternalIterator* DBImpl::NewInternalIterator(
|
|
const ReadOptions& read_options, ColumnFamilyData* cfd,
|
|
SuperVersion* super_version, Arena* arena,
|
|
RangeDelAggregator* range_del_agg) {
|
|
InternalIterator* internal_iter;
|
|
assert(arena != nullptr);
|
|
assert(range_del_agg != nullptr);
|
|
// Need to create internal iterator from the arena.
|
|
MergeIteratorBuilder merge_iter_builder(
|
|
&cfd->internal_comparator(), arena,
|
|
!read_options.total_order_seek &&
|
|
cfd->ioptions()->prefix_extractor != nullptr);
|
|
// Collect iterator for mutable mem
|
|
merge_iter_builder.AddIterator(
|
|
super_version->mem->NewIterator(read_options, arena));
|
|
std::unique_ptr<InternalIterator> range_del_iter;
|
|
Status s;
|
|
if (!read_options.ignore_range_deletions) {
|
|
range_del_iter.reset(
|
|
super_version->mem->NewRangeTombstoneIterator(read_options));
|
|
s = range_del_agg->AddTombstones(std::move(range_del_iter));
|
|
}
|
|
// Collect all needed child iterators for immutable memtables
|
|
if (s.ok()) {
|
|
super_version->imm->AddIterators(read_options, &merge_iter_builder);
|
|
if (!read_options.ignore_range_deletions) {
|
|
s = super_version->imm->AddRangeTombstoneIterators(read_options, arena,
|
|
range_del_agg);
|
|
}
|
|
}
|
|
TEST_SYNC_POINT_CALLBACK("DBImpl::NewInternalIterator:StatusCallback", &s);
|
|
if (s.ok()) {
|
|
// Collect iterators for files in L0 - Ln
|
|
if (read_options.read_tier != kMemtableTier) {
|
|
super_version->current->AddIterators(read_options, env_options_,
|
|
&merge_iter_builder, range_del_agg);
|
|
}
|
|
internal_iter = merge_iter_builder.Finish();
|
|
IterState* cleanup =
|
|
new IterState(this, &mutex_, super_version,
|
|
read_options.background_purge_on_iterator_cleanup);
|
|
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
|
|
|
|
return internal_iter;
|
|
} else {
|
|
CleanupSuperVersion(super_version);
|
|
}
|
|
return NewErrorInternalIterator(s, arena);
|
|
}
|
|
|
|
ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const {
|
|
return default_cf_handle_;
|
|
}
|
|
|
|
Status DBImpl::Get(const ReadOptions& read_options,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
PinnableSlice* value) {
|
|
return GetImpl(read_options, column_family, key, value);
|
|
}
|
|
|
|
Status DBImpl::GetImpl(const ReadOptions& read_options,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
PinnableSlice* pinnable_val, bool* value_found,
|
|
ReadCallback* callback, bool* is_blob_index) {
|
|
assert(pinnable_val != nullptr);
|
|
StopWatch sw(env_, stats_, DB_GET);
|
|
PERF_TIMER_GUARD(get_snapshot_time);
|
|
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
|
|
// Acquire SuperVersion
|
|
SuperVersion* sv = GetAndRefSuperVersion(cfd);
|
|
|
|
TEST_SYNC_POINT("DBImpl::GetImpl:1");
|
|
TEST_SYNC_POINT("DBImpl::GetImpl:2");
|
|
|
|
SequenceNumber snapshot;
|
|
if (read_options.snapshot != nullptr) {
|
|
// Note: In WritePrepared txns this is not necessary but not harmful either.
|
|
// Because prep_seq > snapshot => commit_seq > snapshot so if a snapshot is
|
|
// specified we should be fine with skipping seq numbers that are greater
|
|
// than that.
|
|
snapshot = reinterpret_cast<const SnapshotImpl*>(
|
|
read_options.snapshot)->number_;
|
|
} else {
|
|
// Since we get and reference the super version before getting
|
|
// the snapshot number, without a mutex protection, it is possible
|
|
// that a memtable switch happened in the middle and not all the
|
|
// data for this snapshot is available. But it will contain all
|
|
// the data available in the super version we have, which is also
|
|
// a valid snapshot to read from.
|
|
// We shouldn't get snapshot before finding and referencing the
|
|
// super versipon because a flush happening in between may compact
|
|
// away data for the snapshot, but the snapshot is earlier than the
|
|
// data overwriting it, so users may see wrong results.
|
|
snapshot = concurrent_prepare_ && seq_per_batch_
|
|
? versions_->LastToBeWrittenSequence()
|
|
: versions_->LastSequence();
|
|
}
|
|
TEST_SYNC_POINT("DBImpl::GetImpl:3");
|
|
TEST_SYNC_POINT("DBImpl::GetImpl:4");
|
|
|
|
// Prepare to store a list of merge operations if merge occurs.
|
|
MergeContext merge_context;
|
|
RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);
|
|
|
|
Status s;
|
|
// First look in the memtable, then in the immutable memtable (if any).
|
|
// s is both in/out. When in, s could either be OK or MergeInProgress.
|
|
// merge_operands will contain the sequence of merges in the latter case.
|
|
LookupKey lkey(key, snapshot);
|
|
PERF_TIMER_STOP(get_snapshot_time);
|
|
|
|
bool skip_memtable = (read_options.read_tier == kPersistedTier &&
|
|
has_unpersisted_data_.load(std::memory_order_relaxed));
|
|
bool done = false;
|
|
if (!skip_memtable) {
|
|
if (sv->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
|
|
&range_del_agg, read_options, callback, is_blob_index)) {
|
|
done = true;
|
|
pinnable_val->PinSelf();
|
|
RecordTick(stats_, MEMTABLE_HIT);
|
|
} else if ((s.ok() || s.IsMergeInProgress()) &&
|
|
sv->imm->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
|
|
&range_del_agg, read_options, callback,
|
|
is_blob_index)) {
|
|
done = true;
|
|
pinnable_val->PinSelf();
|
|
RecordTick(stats_, MEMTABLE_HIT);
|
|
}
|
|
if (!done && !s.ok() && !s.IsMergeInProgress()) {
|
|
ReturnAndCleanupSuperVersion(cfd, sv);
|
|
return s;
|
|
}
|
|
}
|
|
if (!done) {
|
|
PERF_TIMER_GUARD(get_from_output_files_time);
|
|
sv->current->Get(read_options, lkey, pinnable_val, &s, &merge_context,
|
|
&range_del_agg, value_found, nullptr, nullptr, callback,
|
|
is_blob_index);
|
|
RecordTick(stats_, MEMTABLE_MISS);
|
|
}
|
|
|
|
{
|
|
PERF_TIMER_GUARD(get_post_process_time);
|
|
|
|
ReturnAndCleanupSuperVersion(cfd, sv);
|
|
|
|
RecordTick(stats_, NUMBER_KEYS_READ);
|
|
size_t size = pinnable_val->size();
|
|
RecordTick(stats_, BYTES_READ, size);
|
|
MeasureTime(stats_, BYTES_PER_READ, size);
|
|
PERF_COUNTER_ADD(get_read_bytes, size);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
std::vector<Status> DBImpl::MultiGet(
|
|
const ReadOptions& read_options,
|
|
const std::vector<ColumnFamilyHandle*>& column_family,
|
|
const std::vector<Slice>& keys, std::vector<std::string>* values) {
|
|
|
|
StopWatch sw(env_, stats_, DB_MULTIGET);
|
|
PERF_TIMER_GUARD(get_snapshot_time);
|
|
|
|
SequenceNumber snapshot;
|
|
|
|
struct MultiGetColumnFamilyData {
|
|
ColumnFamilyData* cfd;
|
|
SuperVersion* super_version;
|
|
};
|
|
std::unordered_map<uint32_t, MultiGetColumnFamilyData*> multiget_cf_data;
|
|
// fill up and allocate outside of mutex
|
|
for (auto cf : column_family) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(cf);
|
|
auto cfd = cfh->cfd();
|
|
if (multiget_cf_data.find(cfd->GetID()) == multiget_cf_data.end()) {
|
|
auto mgcfd = new MultiGetColumnFamilyData();
|
|
mgcfd->cfd = cfd;
|
|
multiget_cf_data.insert({cfd->GetID(), mgcfd});
|
|
}
|
|
}
|
|
|
|
mutex_.Lock();
|
|
if (read_options.snapshot != nullptr) {
|
|
snapshot = reinterpret_cast<const SnapshotImpl*>(
|
|
read_options.snapshot)->number_;
|
|
} else {
|
|
snapshot = concurrent_prepare_ && seq_per_batch_
|
|
? versions_->LastToBeWrittenSequence()
|
|
: versions_->LastSequence();
|
|
}
|
|
for (auto mgd_iter : multiget_cf_data) {
|
|
mgd_iter.second->super_version =
|
|
mgd_iter.second->cfd->GetSuperVersion()->Ref();
|
|
}
|
|
mutex_.Unlock();
|
|
|
|
// Contain a list of merge operations if merge occurs.
|
|
MergeContext merge_context;
|
|
|
|
// Note: this always resizes the values array
|
|
size_t num_keys = keys.size();
|
|
std::vector<Status> stat_list(num_keys);
|
|
values->resize(num_keys);
|
|
|
|
// Keep track of bytes that we read for statistics-recording later
|
|
uint64_t bytes_read = 0;
|
|
PERF_TIMER_STOP(get_snapshot_time);
|
|
|
|
// For each of the given keys, apply the entire "get" process as follows:
|
|
// First look in the memtable, then in the immutable memtable (if any).
|
|
// s is both in/out. When in, s could either be OK or MergeInProgress.
|
|
// merge_operands will contain the sequence of merges in the latter case.
|
|
for (size_t i = 0; i < num_keys; ++i) {
|
|
merge_context.Clear();
|
|
Status& s = stat_list[i];
|
|
std::string* value = &(*values)[i];
|
|
|
|
LookupKey lkey(keys[i], snapshot);
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
|
|
RangeDelAggregator range_del_agg(cfh->cfd()->internal_comparator(),
|
|
snapshot);
|
|
auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
|
|
assert(mgd_iter != multiget_cf_data.end());
|
|
auto mgd = mgd_iter->second;
|
|
auto super_version = mgd->super_version;
|
|
bool skip_memtable =
|
|
(read_options.read_tier == kPersistedTier &&
|
|
has_unpersisted_data_.load(std::memory_order_relaxed));
|
|
bool done = false;
|
|
if (!skip_memtable) {
|
|
if (super_version->mem->Get(lkey, value, &s, &merge_context,
|
|
&range_del_agg, read_options)) {
|
|
done = true;
|
|
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
|
|
} else if (super_version->imm->Get(lkey, value, &s, &merge_context,
|
|
&range_del_agg, read_options)) {
|
|
done = true;
|
|
// TODO(?): RecordTick(stats_, MEMTABLE_HIT)?
|
|
}
|
|
}
|
|
if (!done) {
|
|
PinnableSlice pinnable_val;
|
|
PERF_TIMER_GUARD(get_from_output_files_time);
|
|
super_version->current->Get(read_options, lkey, &pinnable_val, &s,
|
|
&merge_context, &range_del_agg);
|
|
value->assign(pinnable_val.data(), pinnable_val.size());
|
|
// TODO(?): RecordTick(stats_, MEMTABLE_MISS)?
|
|
}
|
|
|
|
if (s.ok()) {
|
|
bytes_read += value->size();
|
|
}
|
|
}
|
|
|
|
// Post processing (decrement reference counts and record statistics)
|
|
PERF_TIMER_GUARD(get_post_process_time);
|
|
autovector<SuperVersion*> superversions_to_delete;
|
|
|
|
// TODO(icanadi) do we need lock here or just around Cleanup()?
|
|
mutex_.Lock();
|
|
for (auto mgd_iter : multiget_cf_data) {
|
|
auto mgd = mgd_iter.second;
|
|
if (mgd->super_version->Unref()) {
|
|
mgd->super_version->Cleanup();
|
|
superversions_to_delete.push_back(mgd->super_version);
|
|
}
|
|
}
|
|
mutex_.Unlock();
|
|
|
|
for (auto td : superversions_to_delete) {
|
|
delete td;
|
|
}
|
|
for (auto mgd : multiget_cf_data) {
|
|
delete mgd.second;
|
|
}
|
|
|
|
RecordTick(stats_, NUMBER_MULTIGET_CALLS);
|
|
RecordTick(stats_, NUMBER_MULTIGET_KEYS_READ, num_keys);
|
|
RecordTick(stats_, NUMBER_MULTIGET_BYTES_READ, bytes_read);
|
|
MeasureTime(stats_, BYTES_PER_MULTIGET, bytes_read);
|
|
PERF_COUNTER_ADD(multiget_read_bytes, bytes_read);
|
|
PERF_TIMER_STOP(get_post_process_time);
|
|
|
|
return stat_list;
|
|
}
|
|
|
|
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
|
const std::string& column_family,
|
|
ColumnFamilyHandle** handle) {
|
|
assert(handle != nullptr);
|
|
Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
|
|
if (s.ok()) {
|
|
s = WriteOptionsFile(true /*need_mutex_lock*/,
|
|
true /*need_enter_write_thread*/);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::CreateColumnFamilies(
|
|
const ColumnFamilyOptions& cf_options,
|
|
const std::vector<std::string>& column_family_names,
|
|
std::vector<ColumnFamilyHandle*>* handles) {
|
|
assert(handles != nullptr);
|
|
handles->clear();
|
|
size_t num_cf = column_family_names.size();
|
|
Status s;
|
|
bool success_once = false;
|
|
for (size_t i = 0; i < num_cf; i++) {
|
|
ColumnFamilyHandle* handle;
|
|
s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
handles->push_back(handle);
|
|
success_once = true;
|
|
}
|
|
if (success_once) {
|
|
Status persist_options_status = WriteOptionsFile(
|
|
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
|
|
if (s.ok() && !persist_options_status.ok()) {
|
|
s = persist_options_status;
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::CreateColumnFamilies(
|
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
|
std::vector<ColumnFamilyHandle*>* handles) {
|
|
assert(handles != nullptr);
|
|
handles->clear();
|
|
size_t num_cf = column_families.size();
|
|
Status s;
|
|
bool success_once = false;
|
|
for (size_t i = 0; i < num_cf; i++) {
|
|
ColumnFamilyHandle* handle;
|
|
s = CreateColumnFamilyImpl(column_families[i].options,
|
|
column_families[i].name, &handle);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
handles->push_back(handle);
|
|
success_once = true;
|
|
}
|
|
if (success_once) {
|
|
Status persist_options_status = WriteOptionsFile(
|
|
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
|
|
if (s.ok() && !persist_options_status.ok()) {
|
|
s = persist_options_status;
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
|
|
const std::string& column_family_name,
|
|
ColumnFamilyHandle** handle) {
|
|
Status s;
|
|
Status persist_options_status;
|
|
*handle = nullptr;
|
|
|
|
s = CheckCompressionSupported(cf_options);
|
|
if (s.ok() && immutable_db_options_.allow_concurrent_memtable_write) {
|
|
s = CheckConcurrentWritesSupported(cf_options);
|
|
}
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
SuperVersionContext sv_context(/* create_superversion */ true);
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
if (versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name) !=
|
|
nullptr) {
|
|
return Status::InvalidArgument("Column family already exists");
|
|
}
|
|
VersionEdit edit;
|
|
edit.AddColumnFamily(column_family_name);
|
|
uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID();
|
|
edit.SetColumnFamily(new_id);
|
|
edit.SetLogNumber(logfile_number_);
|
|
edit.SetComparatorName(cf_options.comparator->Name());
|
|
|
|
// LogAndApply will both write the creation in MANIFEST and create
|
|
// ColumnFamilyData object
|
|
{ // write thread
|
|
WriteThread::Writer w;
|
|
write_thread_.EnterUnbatched(&w, &mutex_);
|
|
// LogAndApply will both write the creation in MANIFEST and create
|
|
// ColumnFamilyData object
|
|
s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
|
|
&mutex_, directories_.GetDbDir(), false,
|
|
&cf_options);
|
|
write_thread_.ExitUnbatched(&w);
|
|
}
|
|
if (s.ok()) {
|
|
single_column_family_mode_ = false;
|
|
auto* cfd =
|
|
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
|
|
assert(cfd != nullptr);
|
|
InstallSuperVersionAndScheduleWork(
|
|
cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
|
|
|
|
if (!cfd->mem()->IsSnapshotSupported()) {
|
|
is_snapshot_supported_ = false;
|
|
}
|
|
|
|
cfd->set_initialized();
|
|
|
|
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"Created column family [%s] (ID %u)",
|
|
column_family_name.c_str(), (unsigned)cfd->GetID());
|
|
} else {
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
|
"Creating column family [%s] FAILED -- %s",
|
|
column_family_name.c_str(), s.ToString().c_str());
|
|
}
|
|
} // InstrumentedMutexLock l(&mutex_)
|
|
|
|
sv_context.Clean();
|
|
// this is outside the mutex
|
|
if (s.ok()) {
|
|
NewThreadStatusCfInfo(
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
|
assert(column_family != nullptr);
|
|
Status s = DropColumnFamilyImpl(column_family);
|
|
if (s.ok()) {
|
|
s = WriteOptionsFile(true /*need_mutex_lock*/,
|
|
true /*need_enter_write_thread*/);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::DropColumnFamilies(
|
|
const std::vector<ColumnFamilyHandle*>& column_families) {
|
|
Status s;
|
|
bool success_once = false;
|
|
for (auto* handle : column_families) {
|
|
s = DropColumnFamilyImpl(handle);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
success_once = true;
|
|
}
|
|
if (success_once) {
|
|
Status persist_options_status = WriteOptionsFile(
|
|
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
|
|
if (s.ok() && !persist_options_status.ok()) {
|
|
s = persist_options_status;
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
if (cfd->GetID() == 0) {
|
|
return Status::InvalidArgument("Can't drop default column family");
|
|
}
|
|
|
|
bool cf_support_snapshot = cfd->mem()->IsSnapshotSupported();
|
|
|
|
VersionEdit edit;
|
|
edit.DropColumnFamily();
|
|
edit.SetColumnFamily(cfd->GetID());
|
|
|
|
Status s;
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
if (cfd->IsDropped()) {
|
|
s = Status::InvalidArgument("Column family already dropped!\n");
|
|
}
|
|
if (s.ok()) {
|
|
// we drop column family from a single write thread
|
|
WriteThread::Writer w;
|
|
write_thread_.EnterUnbatched(&w, &mutex_);
|
|
s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
|
|
&edit, &mutex_);
|
|
write_thread_.ExitUnbatched(&w);
|
|
}
|
|
if (s.ok()) {
|
|
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
|
|
max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size *
|
|
mutable_cf_options->max_write_buffer_number;
|
|
}
|
|
|
|
if (!cf_support_snapshot) {
|
|
// Dropped Column Family doesn't support snapshot. Need to recalculate
|
|
// is_snapshot_supported_.
|
|
bool new_is_snapshot_supported = true;
|
|
for (auto c : *versions_->GetColumnFamilySet()) {
|
|
if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
|
|
new_is_snapshot_supported = false;
|
|
break;
|
|
}
|
|
}
|
|
is_snapshot_supported_ = new_is_snapshot_supported;
|
|
}
|
|
}
|
|
|
|
if (s.ok()) {
|
|
// Note that here we erase the associated cf_info of the to-be-dropped
|
|
// cfd before its ref-count goes to zero to avoid having to erase cf_info
|
|
// later inside db_mutex.
|
|
EraseThreadStatusCfInfo(cfd);
|
|
assert(cfd->IsDropped());
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"Dropped column family with id %u\n", cfd->GetID());
|
|
} else {
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
|
"Dropping column family with id %u FAILED -- %s\n",
|
|
cfd->GetID(), s.ToString().c_str());
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
bool DBImpl::KeyMayExist(const ReadOptions& read_options,
|
|
ColumnFamilyHandle* column_family, const Slice& key,
|
|
std::string* value, bool* value_found) {
|
|
assert(value != nullptr);
|
|
if (value_found != nullptr) {
|
|
// falsify later if key-may-exist but can't fetch value
|
|
*value_found = true;
|
|
}
|
|
ReadOptions roptions = read_options;
|
|
roptions.read_tier = kBlockCacheTier; // read from block cache only
|
|
PinnableSlice pinnable_val;
|
|
auto s = GetImpl(roptions, column_family, key, &pinnable_val, value_found);
|
|
value->assign(pinnable_val.data(), pinnable_val.size());
|
|
|
|
// If block_cache is enabled and the index block of the table didn't
|
|
// not present in block_cache, the return value will be Status::Incomplete.
|
|
// In this case, key may still exist in the table.
|
|
return s.ok() || s.IsIncomplete();
|
|
}
|
|
|
|
Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
|
ColumnFamilyHandle* column_family) {
|
|
Iterator* result = nullptr;
|
|
if (read_options.read_tier == kPersistedTier) {
|
|
return NewErrorIterator(Status::NotSupported(
|
|
"ReadTier::kPersistedData is not yet supported in iterators."));
|
|
}
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
ReadCallback* read_callback = nullptr; // No read callback provided.
|
|
if (read_options.managed) {
|
|
#ifdef ROCKSDB_LITE
|
|
// not supported in lite version
|
|
result = NewErrorIterator(Status::InvalidArgument(
|
|
"Managed Iterators not supported in RocksDBLite."));
|
|
#else
|
|
if ((read_options.tailing) || (read_options.snapshot != nullptr) ||
|
|
(is_snapshot_supported_)) {
|
|
result = new ManagedIterator(this, read_options, cfd);
|
|
} else {
|
|
// Managed iter not supported
|
|
result = NewErrorIterator(Status::InvalidArgument(
|
|
"Managed Iterators not supported without snapshots."));
|
|
}
|
|
#endif
|
|
} else if (read_options.tailing) {
|
|
#ifdef ROCKSDB_LITE
|
|
// not supported in lite version
|
|
result = nullptr;
|
|
|
|
#else
|
|
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
|
|
auto iter = new ForwardIterator(this, read_options, cfd, sv);
|
|
result = NewDBIterator(
|
|
env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter,
|
|
kMaxSequenceNumber,
|
|
sv->mutable_cf_options.max_sequential_skip_in_iterations,
|
|
read_callback);
|
|
#endif
|
|
} else {
|
|
// Note: no need to consider the special case of concurrent_prepare_ &&
|
|
// seq_per_batch_ since NewIterator is overridden in WritePreparedTxnDB
|
|
auto snapshot = read_options.snapshot != nullptr
|
|
? read_options.snapshot->GetSequenceNumber()
|
|
: versions_->LastSequence();
|
|
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
|
|
ColumnFamilyData* cfd,
|
|
SequenceNumber snapshot,
|
|
ReadCallback* read_callback,
|
|
bool allow_blob) {
|
|
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
|
|
|
|
// Try to generate a DB iterator tree in continuous memory area to be
|
|
// cache friendly. Here is an example of result:
|
|
// +-------------------------------+
|
|
// | |
|
|
// | ArenaWrappedDBIter |
|
|
// | + |
|
|
// | +---> Inner Iterator ------------+
|
|
// | | | |
|
|
// | | +-- -- -- -- -- -- -- --+ |
|
|
// | +--- | Arena | |
|
|
// | | | |
|
|
// | Allocated Memory: | |
|
|
// | | +-------------------+ |
|
|
// | | | DBIter | <---+
|
|
// | | + |
|
|
// | | | +-> iter_ ------------+
|
|
// | | | | |
|
|
// | | +-------------------+ |
|
|
// | | | MergingIterator | <---+
|
|
// | | + |
|
|
// | | | +->child iter1 ------------+
|
|
// | | | | | |
|
|
// | | +->child iter2 ----------+ |
|
|
// | | | | | | |
|
|
// | | | +->child iter3 --------+ | |
|
|
// | | | | | |
|
|
// | | +-------------------+ | | |
|
|
// | | | Iterator1 | <--------+
|
|
// | | +-------------------+ | |
|
|
// | | | Iterator2 | <------+
|
|
// | | +-------------------+ |
|
|
// | | | Iterator3 | <----+
|
|
// | | +-------------------+
|
|
// | | |
|
|
// +-------+-----------------------+
|
|
//
|
|
// ArenaWrappedDBIter inlines an arena area where all the iterators in
|
|
// the iterator tree are allocated in the order of being accessed when
|
|
// querying.
|
|
// Laying out the iterators in the order of being accessed makes it more
|
|
// likely that any iterator pointer is close to the iterator it points to so
|
|
// that they are likely to be in the same cache line and/or page.
|
|
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
|
|
env_, read_options, *cfd->ioptions(), snapshot,
|
|
sv->mutable_cf_options.max_sequential_skip_in_iterations,
|
|
sv->version_number, read_callback,
|
|
((read_options.snapshot != nullptr) ? nullptr : this), cfd, allow_blob);
|
|
|
|
InternalIterator* internal_iter =
|
|
NewInternalIterator(read_options, cfd, sv, db_iter->GetArena(),
|
|
db_iter->GetRangeDelAggregator());
|
|
db_iter->SetIterUnderDBIter(internal_iter);
|
|
|
|
return db_iter;
|
|
}
|
|
|
|
Status DBImpl::NewIterators(
|
|
const ReadOptions& read_options,
|
|
const std::vector<ColumnFamilyHandle*>& column_families,
|
|
std::vector<Iterator*>* iterators) {
|
|
if (read_options.read_tier == kPersistedTier) {
|
|
return Status::NotSupported(
|
|
"ReadTier::kPersistedData is not yet supported in iterators.");
|
|
}
|
|
ReadCallback* read_callback = nullptr; // No read callback provided.
|
|
iterators->clear();
|
|
iterators->reserve(column_families.size());
|
|
if (read_options.managed) {
|
|
#ifdef ROCKSDB_LITE
|
|
return Status::InvalidArgument(
|
|
"Managed interator not supported in RocksDB lite");
|
|
#else
|
|
if ((!read_options.tailing) && (read_options.snapshot == nullptr) &&
|
|
(!is_snapshot_supported_)) {
|
|
return Status::InvalidArgument(
|
|
"Managed interator not supported without snapshots");
|
|
}
|
|
for (auto cfh : column_families) {
|
|
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
|
|
auto iter = new ManagedIterator(this, read_options, cfd);
|
|
iterators->push_back(iter);
|
|
}
|
|
#endif
|
|
} else if (read_options.tailing) {
|
|
#ifdef ROCKSDB_LITE
|
|
return Status::InvalidArgument(
|
|
"Tailing interator not supported in RocksDB lite");
|
|
#else
|
|
for (auto cfh : column_families) {
|
|
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
|
|
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
|
|
auto iter = new ForwardIterator(this, read_options, cfd, sv);
|
|
iterators->push_back(NewDBIterator(
|
|
env_, read_options, *cfd->ioptions(), cfd->user_comparator(), iter,
|
|
kMaxSequenceNumber,
|
|
sv->mutable_cf_options.max_sequential_skip_in_iterations,
|
|
read_callback));
|
|
}
|
|
#endif
|
|
} else {
|
|
// Note: no need to consider the special case of concurrent_prepare_ &&
|
|
// seq_per_batch_ since NewIterators is overridden in WritePreparedTxnDB
|
|
auto snapshot = read_options.snapshot != nullptr
|
|
? read_options.snapshot->GetSequenceNumber()
|
|
: versions_->LastSequence();
|
|
for (size_t i = 0; i < column_families.size(); ++i) {
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
|
column_families[i])->cfd();
|
|
iterators->push_back(
|
|
NewIteratorImpl(read_options, cfd, snapshot, read_callback));
|
|
}
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
const Snapshot* DBImpl::GetSnapshot() { return GetSnapshotImpl(false); }
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
|
|
return GetSnapshotImpl(true);
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
const Snapshot* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary) {
|
|
int64_t unix_time = 0;
|
|
env_->GetCurrentTime(&unix_time); // Ignore error
|
|
SnapshotImpl* s = new SnapshotImpl;
|
|
|
|
InstrumentedMutexLock l(&mutex_);
|
|
// returns null if the underlying memtable does not support snapshot.
|
|
if (!is_snapshot_supported_) {
|
|
delete s;
|
|
return nullptr;
|
|
}
|
|
auto snapshot_seq = concurrent_prepare_ && seq_per_batch_
|
|
? versions_->LastToBeWrittenSequence()
|
|
: versions_->LastSequence();
|
|
return snapshots_.New(s, snapshot_seq, unix_time, is_write_conflict_boundary);
|
|
}
|
|
|
|
void DBImpl::ReleaseSnapshot(const Snapshot* s) {
|
|
const SnapshotImpl* casted_s = reinterpret_cast<const SnapshotImpl*>(s);
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
snapshots_.Delete(casted_s);
|
|
uint64_t oldest_snapshot;
|
|
if (snapshots_.empty()) {
|
|
oldest_snapshot = versions_->LastSequence();
|
|
} else {
|
|
oldest_snapshot = snapshots_.oldest()->number_;
|
|
}
|
|
for (auto* cfd : *versions_->GetColumnFamilySet()) {
|
|
cfd->current()->storage_info()->UpdateOldestSnapshot(oldest_snapshot);
|
|
if (!cfd->current()
|
|
->storage_info()
|
|
->BottommostFilesMarkedForCompaction()
|
|
.empty()) {
|
|
SchedulePendingCompaction(cfd);
|
|
MaybeScheduleFlushOrCompaction();
|
|
}
|
|
}
|
|
}
|
|
delete casted_s;
|
|
}
|
|
|
|
bool DBImpl::HasActiveSnapshotLaterThanSN(SequenceNumber sn) {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
if (snapshots_.empty()) {
|
|
return false;
|
|
}
|
|
return (snapshots_.newest()->GetSequenceNumber() >= sn);
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
Status DBImpl::GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
|
|
TablePropertiesCollection* props) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
|
|
// Increment the ref count
|
|
mutex_.Lock();
|
|
auto version = cfd->current();
|
|
version->Ref();
|
|
mutex_.Unlock();
|
|
|
|
auto s = version->GetPropertiesOfAllTables(props);
|
|
|
|
// Decrement the ref count
|
|
mutex_.Lock();
|
|
version->Unref();
|
|
mutex_.Unlock();
|
|
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::GetPropertiesOfTablesInRange(ColumnFamilyHandle* column_family,
|
|
const Range* range, std::size_t n,
|
|
TablePropertiesCollection* props) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
|
|
// Increment the ref count
|
|
mutex_.Lock();
|
|
auto version = cfd->current();
|
|
version->Ref();
|
|
mutex_.Unlock();
|
|
|
|
auto s = version->GetPropertiesOfTablesInRange(range, n, props);
|
|
|
|
// Decrement the ref count
|
|
mutex_.Lock();
|
|
version->Unref();
|
|
mutex_.Unlock();
|
|
|
|
return s;
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
const std::string& DBImpl::GetName() const {
|
|
return dbname_;
|
|
}
|
|
|
|
Env* DBImpl::GetEnv() const {
|
|
return env_;
|
|
}
|
|
|
|
Options DBImpl::GetOptions(ColumnFamilyHandle* column_family) const {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
return Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
|
|
cfh->cfd()->GetLatestCFOptions());
|
|
}
|
|
|
|
DBOptions DBImpl::GetDBOptions() const {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return BuildDBOptions(immutable_db_options_, mutable_db_options_);
|
|
}
|
|
|
|
bool DBImpl::GetProperty(ColumnFamilyHandle* column_family,
|
|
const Slice& property, std::string* value) {
|
|
const DBPropertyInfo* property_info = GetPropertyInfo(property);
|
|
value->clear();
|
|
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
|
|
if (property_info == nullptr) {
|
|
return false;
|
|
} else if (property_info->handle_int) {
|
|
uint64_t int_value;
|
|
bool ret_value =
|
|
GetIntPropertyInternal(cfd, *property_info, false, &int_value);
|
|
if (ret_value) {
|
|
*value = ToString(int_value);
|
|
}
|
|
return ret_value;
|
|
} else if (property_info->handle_string) {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return cfd->internal_stats()->GetStringProperty(*property_info, property,
|
|
value);
|
|
}
|
|
// Shouldn't reach here since exactly one of handle_string and handle_int
|
|
// should be non-nullptr.
|
|
assert(false);
|
|
return false;
|
|
}
|
|
|
|
bool DBImpl::GetMapProperty(ColumnFamilyHandle* column_family,
|
|
const Slice& property,
|
|
std::map<std::string, std::string>* value) {
|
|
const DBPropertyInfo* property_info = GetPropertyInfo(property);
|
|
value->clear();
|
|
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
|
|
if (property_info == nullptr) {
|
|
return false;
|
|
} else if (property_info->handle_map) {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return cfd->internal_stats()->GetMapProperty(*property_info, property,
|
|
value);
|
|
}
|
|
// If we reach this point it means that handle_map is not provided for the
|
|
// requested property
|
|
return false;
|
|
}
|
|
|
|
bool DBImpl::GetIntProperty(ColumnFamilyHandle* column_family,
|
|
const Slice& property, uint64_t* value) {
|
|
const DBPropertyInfo* property_info = GetPropertyInfo(property);
|
|
if (property_info == nullptr || property_info->handle_int == nullptr) {
|
|
return false;
|
|
}
|
|
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
|
|
return GetIntPropertyInternal(cfd, *property_info, false, value);
|
|
}
|
|
|
|
bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
|
|
const DBPropertyInfo& property_info,
|
|
bool is_locked, uint64_t* value) {
|
|
assert(property_info.handle_int != nullptr);
|
|
if (!property_info.need_out_of_mutex) {
|
|
if (is_locked) {
|
|
mutex_.AssertHeld();
|
|
return cfd->internal_stats()->GetIntProperty(property_info, value, this);
|
|
} else {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return cfd->internal_stats()->GetIntProperty(property_info, value, this);
|
|
}
|
|
} else {
|
|
SuperVersion* sv = nullptr;
|
|
if (!is_locked) {
|
|
sv = GetAndRefSuperVersion(cfd);
|
|
} else {
|
|
sv = cfd->GetSuperVersion();
|
|
}
|
|
|
|
bool ret = cfd->internal_stats()->GetIntPropertyOutOfMutex(
|
|
property_info, sv->current, value);
|
|
|
|
if (!is_locked) {
|
|
ReturnAndCleanupSuperVersion(cfd, sv);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
Status DBImpl::ResetStats() {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
for (auto* cfd : *versions_->GetColumnFamilySet()) {
|
|
if (cfd->initialized()) {
|
|
cfd->internal_stats()->Clear();
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
bool DBImpl::GetAggregatedIntProperty(const Slice& property,
|
|
uint64_t* aggregated_value) {
|
|
const DBPropertyInfo* property_info = GetPropertyInfo(property);
|
|
if (property_info == nullptr || property_info->handle_int == nullptr) {
|
|
return false;
|
|
}
|
|
|
|
uint64_t sum = 0;
|
|
{
|
|
// Needs mutex to protect the list of column families.
|
|
InstrumentedMutexLock l(&mutex_);
|
|
uint64_t value;
|
|
for (auto* cfd : *versions_->GetColumnFamilySet()) {
|
|
if (!cfd->initialized()) {
|
|
continue;
|
|
}
|
|
if (GetIntPropertyInternal(cfd, *property_info, true, &value)) {
|
|
sum += value;
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
*aggregated_value = sum;
|
|
return true;
|
|
}
|
|
|
|
SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
|
|
// TODO(ljin): consider using GetReferencedSuperVersion() directly
|
|
return cfd->GetThreadLocalSuperVersion(&mutex_);
|
|
}
|
|
|
|
// REQUIRED: this function should only be called on the write thread or if the
|
|
// mutex is held.
|
|
SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
|
|
auto column_family_set = versions_->GetColumnFamilySet();
|
|
auto cfd = column_family_set->GetColumnFamily(column_family_id);
|
|
if (!cfd) {
|
|
return nullptr;
|
|
}
|
|
|
|
return GetAndRefSuperVersion(cfd);
|
|
}
|
|
|
|
void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
|
|
// Release SuperVersion
|
|
if (sv->Unref()) {
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
sv->Cleanup();
|
|
}
|
|
delete sv;
|
|
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
|
|
}
|
|
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
|
|
}
|
|
|
|
void DBImpl::ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd,
|
|
SuperVersion* sv) {
|
|
if (!cfd->ReturnThreadLocalSuperVersion(sv)) {
|
|
CleanupSuperVersion(sv);
|
|
}
|
|
}
|
|
|
|
// REQUIRED: this function should only be called on the write thread.
|
|
void DBImpl::ReturnAndCleanupSuperVersion(uint32_t column_family_id,
|
|
SuperVersion* sv) {
|
|
auto column_family_set = versions_->GetColumnFamilySet();
|
|
auto cfd = column_family_set->GetColumnFamily(column_family_id);
|
|
|
|
// If SuperVersion is held, and we successfully fetched a cfd using
|
|
// GetAndRefSuperVersion(), it must still exist.
|
|
assert(cfd != nullptr);
|
|
ReturnAndCleanupSuperVersion(cfd, sv);
|
|
}
|
|
|
|
// REQUIRED: this function should only be called on the write thread or if the
|
|
// mutex is held.
|
|
ColumnFamilyHandle* DBImpl::GetColumnFamilyHandle(uint32_t column_family_id) {
|
|
ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
|
|
|
|
if (!cf_memtables->Seek(column_family_id)) {
|
|
return nullptr;
|
|
}
|
|
|
|
return cf_memtables->GetColumnFamilyHandle();
|
|
}
|
|
|
|
// REQUIRED: mutex is NOT held.
|
|
ColumnFamilyHandle* DBImpl::GetColumnFamilyHandleUnlocked(
|
|
uint32_t column_family_id) {
|
|
ColumnFamilyMemTables* cf_memtables = column_family_memtables_.get();
|
|
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
if (!cf_memtables->Seek(column_family_id)) {
|
|
return nullptr;
|
|
}
|
|
|
|
return cf_memtables->GetColumnFamilyHandle();
|
|
}
|
|
|
|
void DBImpl::GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
|
|
const Range& range,
|
|
uint64_t* const count,
|
|
uint64_t* const size) {
|
|
ColumnFamilyHandleImpl* cfh =
|
|
reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
ColumnFamilyData* cfd = cfh->cfd();
|
|
SuperVersion* sv = GetAndRefSuperVersion(cfd);
|
|
|
|
// Convert user_key into a corresponding internal key.
|
|
InternalKey k1(range.start, kMaxSequenceNumber, kValueTypeForSeek);
|
|
InternalKey k2(range.limit, kMaxSequenceNumber, kValueTypeForSeek);
|
|
MemTable::MemTableStats memStats =
|
|
sv->mem->ApproximateStats(k1.Encode(), k2.Encode());
|
|
MemTable::MemTableStats immStats =
|
|
sv->imm->ApproximateStats(k1.Encode(), k2.Encode());
|
|
*count = memStats.count + immStats.count;
|
|
*size = memStats.size + immStats.size;
|
|
|
|
ReturnAndCleanupSuperVersion(cfd, sv);
|
|
}
|
|
|
|
void DBImpl::GetApproximateSizes(ColumnFamilyHandle* column_family,
|
|
const Range* range, int n, uint64_t* sizes,
|
|
uint8_t include_flags) {
|
|
assert(include_flags & DB::SizeApproximationFlags::INCLUDE_FILES ||
|
|
include_flags & DB::SizeApproximationFlags::INCLUDE_MEMTABLES);
|
|
Version* v;
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
SuperVersion* sv = GetAndRefSuperVersion(cfd);
|
|
v = sv->current;
|
|
|
|
for (int i = 0; i < n; i++) {
|
|
// Convert user_key into a corresponding internal key.
|
|
InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
|
|
InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
|
|
sizes[i] = 0;
|
|
if (include_flags & DB::SizeApproximationFlags::INCLUDE_FILES) {
|
|
sizes[i] += versions_->ApproximateSize(v, k1.Encode(), k2.Encode());
|
|
}
|
|
if (include_flags & DB::SizeApproximationFlags::INCLUDE_MEMTABLES) {
|
|
sizes[i] += sv->mem->ApproximateStats(k1.Encode(), k2.Encode()).size;
|
|
sizes[i] += sv->imm->ApproximateStats(k1.Encode(), k2.Encode()).size;
|
|
}
|
|
}
|
|
|
|
ReturnAndCleanupSuperVersion(cfd, sv);
|
|
}
|
|
|
|
std::list<uint64_t>::iterator
|
|
DBImpl::CaptureCurrentFileNumberInPendingOutputs() {
|
|
// We need to remember the iterator of our insert, because after the
|
|
// background job is done, we need to remove that element from
|
|
// pending_outputs_.
|
|
pending_outputs_.push_back(versions_->current_next_file_number());
|
|
auto pending_outputs_inserted_elem = pending_outputs_.end();
|
|
--pending_outputs_inserted_elem;
|
|
return pending_outputs_inserted_elem;
|
|
}
|
|
|
|
void DBImpl::ReleaseFileNumberFromPendingOutputs(
|
|
std::list<uint64_t>::iterator v) {
|
|
pending_outputs_.erase(v);
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
Status DBImpl::GetUpdatesSince(
|
|
SequenceNumber seq, unique_ptr<TransactionLogIterator>* iter,
|
|
const TransactionLogIterator::ReadOptions& read_options) {
|
|
|
|
RecordTick(stats_, GET_UPDATES_SINCE_CALLS);
|
|
if (seq > versions_->LastSequence()) {
|
|
return Status::NotFound("Requested sequence not yet written in the db");
|
|
}
|
|
return wal_manager_.GetUpdatesSince(seq, iter, read_options, versions_.get());
|
|
}
|
|
|
|
Status DBImpl::DeleteFile(std::string name) {
|
|
uint64_t number;
|
|
FileType type;
|
|
WalFileType log_type;
|
|
if (!ParseFileName(name, &number, &type, &log_type) ||
|
|
(type != kTableFile && type != kLogFile)) {
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "DeleteFile %s failed.\n",
|
|
name.c_str());
|
|
return Status::InvalidArgument("Invalid file name");
|
|
}
|
|
|
|
Status status;
|
|
if (type == kLogFile) {
|
|
// Only allow deleting archived log files
|
|
if (log_type != kArchivedLogFile) {
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
|
"DeleteFile %s failed - not archived log.\n",
|
|
name.c_str());
|
|
return Status::NotSupported("Delete only supported for archived logs");
|
|
}
|
|
status =
|
|
env_->DeleteFile(immutable_db_options_.wal_dir + "/" + name.c_str());
|
|
if (!status.ok()) {
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
|
"DeleteFile %s failed -- %s.\n", name.c_str(),
|
|
status.ToString().c_str());
|
|
}
|
|
return status;
|
|
}
|
|
|
|
int level;
|
|
FileMetaData* metadata;
|
|
ColumnFamilyData* cfd;
|
|
VersionEdit edit;
|
|
JobContext job_context(next_job_id_.fetch_add(1), true);
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
|
|
if (!status.ok()) {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"DeleteFile %s failed. File not found\n", name.c_str());
|
|
job_context.Clean();
|
|
return Status::InvalidArgument("File not found");
|
|
}
|
|
assert(level < cfd->NumberLevels());
|
|
|
|
// If the file is being compacted no need to delete.
|
|
if (metadata->being_compacted) {
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"DeleteFile %s Skipped. File about to be compacted\n",
|
|
name.c_str());
|
|
job_context.Clean();
|
|
return Status::OK();
|
|
}
|
|
|
|
// Only the files in the last level can be deleted externally.
|
|
// This is to make sure that any deletion tombstones are not
|
|
// lost. Check that the level passed is the last level.
|
|
auto* vstoreage = cfd->current()->storage_info();
|
|
for (int i = level + 1; i < cfd->NumberLevels(); i++) {
|
|
if (vstoreage->NumLevelFiles(i) != 0) {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"DeleteFile %s FAILED. File not in last level\n",
|
|
name.c_str());
|
|
job_context.Clean();
|
|
return Status::InvalidArgument("File not in last level");
|
|
}
|
|
}
|
|
// if level == 0, it has to be the oldest file
|
|
if (level == 0 &&
|
|
vstoreage->LevelFiles(0).back()->fd.GetNumber() != number) {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"DeleteFile %s failed ---"
|
|
" target file in level 0 must be the oldest.",
|
|
name.c_str());
|
|
job_context.Clean();
|
|
return Status::InvalidArgument("File in level 0, but not oldest");
|
|
}
|
|
edit.SetColumnFamily(cfd->GetID());
|
|
edit.DeleteFile(level, number);
|
|
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
|
|
&edit, &mutex_, directories_.GetDbDir());
|
|
if (status.ok()) {
|
|
InstallSuperVersionAndScheduleWork(
|
|
cfd, &job_context.superversion_context,
|
|
*cfd->GetLatestMutableCFOptions());
|
|
}
|
|
FindObsoleteFiles(&job_context, false);
|
|
} // lock released here
|
|
|
|
LogFlush(immutable_db_options_.info_log);
|
|
// remove files outside the db-lock
|
|
if (job_context.HaveSomethingToDelete()) {
|
|
// Call PurgeObsoleteFiles() without holding mutex.
|
|
PurgeObsoleteFiles(job_context);
|
|
}
|
|
job_context.Clean();
|
|
return status;
|
|
}
|
|
|
|
Status DBImpl::DeleteFilesInRange(ColumnFamilyHandle* column_family,
|
|
const Slice* begin, const Slice* end) {
|
|
Status status;
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
ColumnFamilyData* cfd = cfh->cfd();
|
|
VersionEdit edit;
|
|
std::vector<FileMetaData*> deleted_files;
|
|
JobContext job_context(next_job_id_.fetch_add(1), true);
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
Version* input_version = cfd->current();
|
|
|
|
auto* vstorage = input_version->storage_info();
|
|
for (int i = 1; i < cfd->NumberLevels(); i++) {
|
|
if (vstorage->LevelFiles(i).empty() ||
|
|
!vstorage->OverlapInLevel(i, begin, end)) {
|
|
continue;
|
|
}
|
|
std::vector<FileMetaData*> level_files;
|
|
InternalKey begin_storage, end_storage, *begin_key, *end_key;
|
|
if (begin == nullptr) {
|
|
begin_key = nullptr;
|
|
} else {
|
|
begin_storage.SetMinPossibleForUserKey(*begin);
|
|
begin_key = &begin_storage;
|
|
}
|
|
if (end == nullptr) {
|
|
end_key = nullptr;
|
|
} else {
|
|
end_storage.SetMaxPossibleForUserKey(*end);
|
|
end_key = &end_storage;
|
|
}
|
|
|
|
vstorage->GetOverlappingInputs(i, begin_key, end_key, &level_files, -1,
|
|
nullptr, false);
|
|
FileMetaData* level_file;
|
|
for (uint32_t j = 0; j < level_files.size(); j++) {
|
|
level_file = level_files[j];
|
|
if (((begin == nullptr) ||
|
|
(cfd->internal_comparator().user_comparator()->Compare(
|
|
level_file->smallest.user_key(), *begin) >= 0)) &&
|
|
((end == nullptr) ||
|
|
(cfd->internal_comparator().user_comparator()->Compare(
|
|
level_file->largest.user_key(), *end) <= 0))) {
|
|
if (level_file->being_compacted) {
|
|
continue;
|
|
}
|
|
edit.SetColumnFamily(cfd->GetID());
|
|
edit.DeleteFile(i, level_file->fd.GetNumber());
|
|
deleted_files.push_back(level_file);
|
|
level_file->being_compacted = true;
|
|
}
|
|
}
|
|
}
|
|
if (edit.GetDeletedFiles().empty()) {
|
|
job_context.Clean();
|
|
return Status::OK();
|
|
}
|
|
input_version->Ref();
|
|
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
|
|
&edit, &mutex_, directories_.GetDbDir());
|
|
if (status.ok()) {
|
|
InstallSuperVersionAndScheduleWork(
|
|
cfd, &job_context.superversion_context,
|
|
*cfd->GetLatestMutableCFOptions());
|
|
}
|
|
for (auto* deleted_file : deleted_files) {
|
|
deleted_file->being_compacted = false;
|
|
}
|
|
input_version->Unref();
|
|
FindObsoleteFiles(&job_context, false);
|
|
} // lock released here
|
|
|
|
LogFlush(immutable_db_options_.info_log);
|
|
// remove files outside the db-lock
|
|
if (job_context.HaveSomethingToDelete()) {
|
|
// Call PurgeObsoleteFiles() without holding mutex.
|
|
PurgeObsoleteFiles(job_context);
|
|
}
|
|
job_context.Clean();
|
|
return status;
|
|
}
|
|
|
|
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
versions_->GetLiveFilesMetaData(metadata);
|
|
}
|
|
|
|
void DBImpl::GetColumnFamilyMetaData(
|
|
ColumnFamilyHandle* column_family,
|
|
ColumnFamilyMetaData* cf_meta) {
|
|
assert(column_family);
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
|
|
auto* sv = GetAndRefSuperVersion(cfd);
|
|
sv->current->GetColumnFamilyMetaData(cf_meta);
|
|
ReturnAndCleanupSuperVersion(cfd, sv);
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
Status DBImpl::CheckConsistency() {
|
|
mutex_.AssertHeld();
|
|
std::vector<LiveFileMetaData> metadata;
|
|
versions_->GetLiveFilesMetaData(&metadata);
|
|
|
|
std::string corruption_messages;
|
|
for (const auto& md : metadata) {
|
|
// md.name has a leading "/".
|
|
std::string file_path = md.db_path + md.name;
|
|
|
|
uint64_t fsize = 0;
|
|
Status s = env_->GetFileSize(file_path, &fsize);
|
|
if (!s.ok() &&
|
|
env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok()) {
|
|
s = Status::OK();
|
|
}
|
|
if (!s.ok()) {
|
|
corruption_messages +=
|
|
"Can't access " + md.name + ": " + s.ToString() + "\n";
|
|
} else if (fsize != md.size) {
|
|
corruption_messages += "Sst file size mismatch: " + file_path +
|
|
". Size recorded in manifest " +
|
|
ToString(md.size) + ", actual size " +
|
|
ToString(fsize) + "\n";
|
|
}
|
|
}
|
|
if (corruption_messages.size() == 0) {
|
|
return Status::OK();
|
|
} else {
|
|
return Status::Corruption(corruption_messages);
|
|
}
|
|
}
|
|
|
|
Status DBImpl::GetDbIdentity(std::string& identity) const {
|
|
std::string idfilename = IdentityFileName(dbname_);
|
|
const EnvOptions soptions;
|
|
unique_ptr<SequentialFileReader> id_file_reader;
|
|
Status s;
|
|
{
|
|
unique_ptr<SequentialFile> idfile;
|
|
s = env_->NewSequentialFile(idfilename, &idfile, soptions);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
id_file_reader.reset(new SequentialFileReader(std::move(idfile)));
|
|
}
|
|
|
|
uint64_t file_size;
|
|
s = env_->GetFileSize(idfilename, &file_size);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
char* buffer = reinterpret_cast<char*>(alloca(file_size));
|
|
Slice id;
|
|
s = id_file_reader->Read(static_cast<size_t>(file_size), &id, buffer);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
identity.assign(id.ToString());
|
|
// If last character is '\n' remove it from identity
|
|
if (identity.size() > 0 && identity.back() == '\n') {
|
|
identity.pop_back();
|
|
}
|
|
return s;
|
|
}
|
|
|
|
// Default implementation -- returns not supported status
|
|
Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
|
|
const std::string& column_family_name,
|
|
ColumnFamilyHandle** handle) {
|
|
return Status::NotSupported("");
|
|
}
|
|
|
|
Status DB::CreateColumnFamilies(
|
|
const ColumnFamilyOptions& cf_options,
|
|
const std::vector<std::string>& column_family_names,
|
|
std::vector<ColumnFamilyHandle*>* handles) {
|
|
return Status::NotSupported("");
|
|
}
|
|
|
|
Status DB::CreateColumnFamilies(
|
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
|
std::vector<ColumnFamilyHandle*>* handles) {
|
|
return Status::NotSupported("");
|
|
}
|
|
|
|
Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) {
|
|
return Status::NotSupported("");
|
|
}
|
|
|
|
Status DB::DropColumnFamilies(
|
|
const std::vector<ColumnFamilyHandle*>& column_families) {
|
|
return Status::NotSupported("");
|
|
}
|
|
|
|
Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
|
|
delete column_family;
|
|
return Status::OK();
|
|
}
|
|
|
|
DB::~DB() { }
|
|
|
|
Status DB::ListColumnFamilies(const DBOptions& db_options,
|
|
const std::string& name,
|
|
std::vector<std::string>* column_families) {
|
|
return VersionSet::ListColumnFamilies(column_families, name, db_options.env);
|
|
}
|
|
|
|
Snapshot::~Snapshot() {
|
|
}
|
|
|
|
Status DestroyDB(const std::string& dbname, const Options& options) {
|
|
const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
|
|
Env* env = soptions.env;
|
|
std::vector<std::string> filenames;
|
|
|
|
// Ignore error in case directory does not exist
|
|
env->GetChildren(dbname, &filenames);
|
|
|
|
FileLock* lock;
|
|
const std::string lockname = LockFileName(dbname);
|
|
Status result = env->LockFile(lockname, &lock);
|
|
if (result.ok()) {
|
|
uint64_t number;
|
|
FileType type;
|
|
InfoLogPrefix info_log_prefix(!soptions.db_log_dir.empty(), dbname);
|
|
for (size_t i = 0; i < filenames.size(); i++) {
|
|
if (ParseFileName(filenames[i], &number, info_log_prefix.prefix, &type) &&
|
|
type != kDBLockFile) { // Lock file will be deleted at end
|
|
Status del;
|
|
std::string path_to_delete = dbname + "/" + filenames[i];
|
|
if (type == kMetaDatabase) {
|
|
del = DestroyDB(path_to_delete, options);
|
|
} else if (type == kTableFile) {
|
|
del = DeleteSSTFile(&soptions, path_to_delete, 0);
|
|
} else {
|
|
del = env->DeleteFile(path_to_delete);
|
|
}
|
|
if (result.ok() && !del.ok()) {
|
|
result = del;
|
|
}
|
|
}
|
|
}
|
|
|
|
for (size_t path_id = 0; path_id < options.db_paths.size(); path_id++) {
|
|
const auto& db_path = options.db_paths[path_id];
|
|
env->GetChildren(db_path.path, &filenames);
|
|
for (size_t i = 0; i < filenames.size(); i++) {
|
|
if (ParseFileName(filenames[i], &number, &type) &&
|
|
type == kTableFile) { // Lock file will be deleted at end
|
|
std::string table_path = db_path.path + "/" + filenames[i];
|
|
Status del = DeleteSSTFile(&soptions, table_path,
|
|
static_cast<uint32_t>(path_id));
|
|
if (result.ok() && !del.ok()) {
|
|
result = del;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
std::vector<std::string> walDirFiles;
|
|
std::string archivedir = ArchivalDirectory(dbname);
|
|
if (dbname != soptions.wal_dir) {
|
|
env->GetChildren(soptions.wal_dir, &walDirFiles);
|
|
archivedir = ArchivalDirectory(soptions.wal_dir);
|
|
}
|
|
|
|
// Delete log files in the WAL dir
|
|
for (const auto& file : walDirFiles) {
|
|
if (ParseFileName(file, &number, &type) && type == kLogFile) {
|
|
Status del = env->DeleteFile(LogFileName(soptions.wal_dir, number));
|
|
if (result.ok() && !del.ok()) {
|
|
result = del;
|
|
}
|
|
}
|
|
}
|
|
|
|
std::vector<std::string> archiveFiles;
|
|
env->GetChildren(archivedir, &archiveFiles);
|
|
// Delete archival files.
|
|
for (size_t i = 0; i < archiveFiles.size(); ++i) {
|
|
if (ParseFileName(archiveFiles[i], &number, &type) &&
|
|
type == kLogFile) {
|
|
Status del = env->DeleteFile(archivedir + "/" + archiveFiles[i]);
|
|
if (result.ok() && !del.ok()) {
|
|
result = del;
|
|
}
|
|
}
|
|
}
|
|
|
|
// ignore case where no archival directory is present
|
|
env->DeleteDir(archivedir);
|
|
|
|
env->UnlockFile(lock); // Ignore error since state is already gone
|
|
env->DeleteFile(lockname);
|
|
env->DeleteDir(dbname); // Ignore error in case dir contains other files
|
|
env->DeleteDir(soptions.wal_dir);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
|
|
bool need_enter_write_thread) {
|
|
#ifndef ROCKSDB_LITE
|
|
WriteThread::Writer w;
|
|
if (need_mutex_lock) {
|
|
mutex_.Lock();
|
|
} else {
|
|
mutex_.AssertHeld();
|
|
}
|
|
if (need_enter_write_thread) {
|
|
write_thread_.EnterUnbatched(&w, &mutex_);
|
|
}
|
|
|
|
std::vector<std::string> cf_names;
|
|
std::vector<ColumnFamilyOptions> cf_opts;
|
|
|
|
// This part requires mutex to protect the column family options
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
if (cfd->IsDropped()) {
|
|
continue;
|
|
}
|
|
cf_names.push_back(cfd->GetName());
|
|
cf_opts.push_back(cfd->GetLatestCFOptions());
|
|
}
|
|
|
|
// Unlock during expensive operations. New writes cannot get here
|
|
// because the single write thread ensures all new writes get queued.
|
|
DBOptions db_options =
|
|
BuildDBOptions(immutable_db_options_, mutable_db_options_);
|
|
mutex_.Unlock();
|
|
|
|
TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
|
|
TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
|
|
|
|
std::string file_name =
|
|
TempOptionsFileName(GetName(), versions_->NewFileNumber());
|
|
Status s =
|
|
PersistRocksDBOptions(db_options, cf_names, cf_opts, file_name, GetEnv());
|
|
|
|
if (s.ok()) {
|
|
s = RenameTempFileToOptionsFile(file_name);
|
|
}
|
|
// restore lock
|
|
if (!need_mutex_lock) {
|
|
mutex_.Lock();
|
|
}
|
|
if (need_enter_write_thread) {
|
|
write_thread_.ExitUnbatched(&w);
|
|
}
|
|
if (!s.ok()) {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"Unnable to persist options -- %s", s.ToString().c_str());
|
|
if (immutable_db_options_.fail_if_options_file_error) {
|
|
return Status::IOError("Unable to persist options.",
|
|
s.ToString().c_str());
|
|
}
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
return Status::OK();
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
namespace {
|
|
void DeleteOptionsFilesHelper(const std::map<uint64_t, std::string>& filenames,
|
|
const size_t num_files_to_keep,
|
|
const std::shared_ptr<Logger>& info_log,
|
|
Env* env) {
|
|
if (filenames.size() <= num_files_to_keep) {
|
|
return;
|
|
}
|
|
for (auto iter = std::next(filenames.begin(), num_files_to_keep);
|
|
iter != filenames.end(); ++iter) {
|
|
if (!env->DeleteFile(iter->second).ok()) {
|
|
ROCKS_LOG_WARN(info_log, "Unable to delete options file %s",
|
|
iter->second.c_str());
|
|
}
|
|
}
|
|
}
|
|
} // namespace
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
Status DBImpl::DeleteObsoleteOptionsFiles() {
|
|
#ifndef ROCKSDB_LITE
|
|
std::vector<std::string> filenames;
|
|
// use ordered map to store keep the filenames sorted from the newest
|
|
// to the oldest.
|
|
std::map<uint64_t, std::string> options_filenames;
|
|
Status s;
|
|
s = GetEnv()->GetChildren(GetName(), &filenames);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
for (auto& filename : filenames) {
|
|
uint64_t file_number;
|
|
FileType type;
|
|
if (ParseFileName(filename, &file_number, &type) && type == kOptionsFile) {
|
|
options_filenames.insert(
|
|
{std::numeric_limits<uint64_t>::max() - file_number,
|
|
GetName() + "/" + filename});
|
|
}
|
|
}
|
|
|
|
// Keeps the latest 2 Options file
|
|
const size_t kNumOptionsFilesKept = 2;
|
|
DeleteOptionsFilesHelper(options_filenames, kNumOptionsFilesKept,
|
|
immutable_db_options_.info_log, GetEnv());
|
|
return Status::OK();
|
|
#else
|
|
return Status::OK();
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
|
|
#ifndef ROCKSDB_LITE
|
|
Status s;
|
|
|
|
versions_->options_file_number_ = versions_->NewFileNumber();
|
|
std::string options_file_name =
|
|
OptionsFileName(GetName(), versions_->options_file_number_);
|
|
// Retry if the file name happen to conflict with an existing one.
|
|
s = GetEnv()->RenameFile(file_name, options_file_name);
|
|
|
|
DeleteObsoleteOptionsFiles();
|
|
return s;
|
|
#else
|
|
return Status::OK();
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
#ifdef ROCKSDB_USING_THREAD_STATUS
|
|
|
|
void DBImpl::NewThreadStatusCfInfo(
|
|
ColumnFamilyData* cfd) const {
|
|
if (immutable_db_options_.enable_thread_tracking) {
|
|
ThreadStatusUtil::NewColumnFamilyInfo(this, cfd, cfd->GetName(),
|
|
cfd->ioptions()->env);
|
|
}
|
|
}
|
|
|
|
void DBImpl::EraseThreadStatusCfInfo(
|
|
ColumnFamilyData* cfd) const {
|
|
if (immutable_db_options_.enable_thread_tracking) {
|
|
ThreadStatusUtil::EraseColumnFamilyInfo(cfd);
|
|
}
|
|
}
|
|
|
|
void DBImpl::EraseThreadStatusDbInfo() const {
|
|
if (immutable_db_options_.enable_thread_tracking) {
|
|
ThreadStatusUtil::EraseDatabaseInfo(this);
|
|
}
|
|
}
|
|
|
|
#else
|
|
void DBImpl::NewThreadStatusCfInfo(
|
|
ColumnFamilyData* cfd) const {
|
|
}
|
|
|
|
void DBImpl::EraseThreadStatusCfInfo(
|
|
ColumnFamilyData* cfd) const {
|
|
}
|
|
|
|
void DBImpl::EraseThreadStatusDbInfo() const {
|
|
}
|
|
#endif // ROCKSDB_USING_THREAD_STATUS
|
|
|
|
//
|
|
// A global method that can dump out the build version
|
|
void DumpRocksDBBuildVersion(Logger * log) {
|
|
#if !defined(IOS_CROSS_COMPILE)
|
|
// if we compile with Xcode, we don't run build_detect_version, so we don't
|
|
// generate util/build_version.cc
|
|
ROCKS_LOG_HEADER(log, "RocksDB version: %d.%d.%d\n", ROCKSDB_MAJOR,
|
|
ROCKSDB_MINOR, ROCKSDB_PATCH);
|
|
ROCKS_LOG_HEADER(log, "Git sha %s", rocksdb_build_git_sha);
|
|
ROCKS_LOG_HEADER(log, "Compile date %s", rocksdb_build_compile_date);
|
|
#endif
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
SequenceNumber DBImpl::GetEarliestMemTableSequenceNumber(SuperVersion* sv,
|
|
bool include_history) {
|
|
// Find the earliest sequence number that we know we can rely on reading
|
|
// from the memtable without needing to check sst files.
|
|
SequenceNumber earliest_seq =
|
|
sv->imm->GetEarliestSequenceNumber(include_history);
|
|
if (earliest_seq == kMaxSequenceNumber) {
|
|
earliest_seq = sv->mem->GetEarliestSequenceNumber();
|
|
}
|
|
assert(sv->mem->GetEarliestSequenceNumber() >= earliest_seq);
|
|
|
|
return earliest_seq;
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
|
|
bool cache_only, SequenceNumber* seq,
|
|
bool* found_record_for_key,
|
|
bool* is_blob_index) {
|
|
Status s;
|
|
MergeContext merge_context;
|
|
RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(),
|
|
kMaxSequenceNumber);
|
|
|
|
ReadOptions read_options;
|
|
SequenceNumber current_seq = versions_->LastSequence();
|
|
LookupKey lkey(key, current_seq);
|
|
|
|
*seq = kMaxSequenceNumber;
|
|
*found_record_for_key = false;
|
|
|
|
// Check if there is a record for this key in the latest memtable
|
|
sv->mem->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq,
|
|
read_options, nullptr /*read_callback*/, is_blob_index);
|
|
|
|
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
|
|
// unexpected error reading memtable.
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
|
"Unexpected status returned from MemTable::Get: %s\n",
|
|
s.ToString().c_str());
|
|
|
|
return s;
|
|
}
|
|
|
|
if (*seq != kMaxSequenceNumber) {
|
|
// Found a sequence number, no need to check immutable memtables
|
|
*found_record_for_key = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
// Check if there is a record for this key in the immutable memtables
|
|
sv->imm->Get(lkey, nullptr, &s, &merge_context, &range_del_agg, seq,
|
|
read_options, nullptr /*read_callback*/, is_blob_index);
|
|
|
|
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
|
|
// unexpected error reading memtable.
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
|
"Unexpected status returned from MemTableList::Get: %s\n",
|
|
s.ToString().c_str());
|
|
|
|
return s;
|
|
}
|
|
|
|
if (*seq != kMaxSequenceNumber) {
|
|
// Found a sequence number, no need to check memtable history
|
|
*found_record_for_key = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
// Check if there is a record for this key in the immutable memtables
|
|
sv->imm->GetFromHistory(lkey, nullptr, &s, &merge_context, &range_del_agg,
|
|
seq, read_options, is_blob_index);
|
|
|
|
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
|
|
// unexpected error reading memtable.
|
|
ROCKS_LOG_ERROR(
|
|
immutable_db_options_.info_log,
|
|
"Unexpected status returned from MemTableList::GetFromHistory: %s\n",
|
|
s.ToString().c_str());
|
|
|
|
return s;
|
|
}
|
|
|
|
if (*seq != kMaxSequenceNumber) {
|
|
// Found a sequence number, no need to check SST files
|
|
*found_record_for_key = true;
|
|
return Status::OK();
|
|
}
|
|
|
|
// TODO(agiardullo): possible optimization: consider checking cached
|
|
// SST files if cache_only=true?
|
|
if (!cache_only) {
|
|
// Check tables
|
|
sv->current->Get(read_options, lkey, nullptr, &s, &merge_context,
|
|
&range_del_agg, nullptr /* value_found */,
|
|
found_record_for_key, seq, nullptr /*read_callback*/,
|
|
is_blob_index);
|
|
|
|
if (!(s.ok() || s.IsNotFound() || s.IsMergeInProgress())) {
|
|
// unexpected error reading SST files
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
|
"Unexpected status returned from Version::Get: %s\n",
|
|
s.ToString().c_str());
|
|
|
|
return s;
|
|
}
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
Status DBImpl::IngestExternalFile(
|
|
ColumnFamilyHandle* column_family,
|
|
const std::vector<std::string>& external_files,
|
|
const IngestExternalFileOptions& ingestion_options) {
|
|
Status status;
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
|
|
// Ingest should immediately fail if ingest_behind is requested,
|
|
// but the DB doesn't support it.
|
|
if (ingestion_options.ingest_behind) {
|
|
if (!immutable_db_options_.allow_ingest_behind) {
|
|
return Status::InvalidArgument(
|
|
"Can't ingest_behind file in DB with allow_ingest_behind=false");
|
|
}
|
|
}
|
|
|
|
ExternalSstFileIngestionJob ingestion_job(env_, versions_.get(), cfd,
|
|
immutable_db_options_, env_options_,
|
|
&snapshots_, ingestion_options);
|
|
|
|
std::list<uint64_t>::iterator pending_output_elem;
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
if (!bg_error_.ok()) {
|
|
// Don't ingest files when there is a bg_error
|
|
return bg_error_;
|
|
}
|
|
|
|
// Make sure that bg cleanup wont delete the files that we are ingesting
|
|
pending_output_elem = CaptureCurrentFileNumberInPendingOutputs();
|
|
}
|
|
|
|
status = ingestion_job.Prepare(external_files);
|
|
if (!status.ok()) {
|
|
return status;
|
|
}
|
|
|
|
SuperVersionContext sv_context(/* create_superversion */ true);
|
|
TEST_SYNC_POINT("DBImpl::AddFile:Start");
|
|
{
|
|
// Lock db mutex
|
|
InstrumentedMutexLock l(&mutex_);
|
|
TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
|
|
|
|
// Stop writes to the DB by entering both write threads
|
|
WriteThread::Writer w;
|
|
write_thread_.EnterUnbatched(&w, &mutex_);
|
|
WriteThread::Writer nonmem_w;
|
|
if (concurrent_prepare_) {
|
|
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
|
|
}
|
|
|
|
num_running_ingest_file_++;
|
|
|
|
// We cannot ingest a file into a dropped CF
|
|
if (cfd->IsDropped()) {
|
|
status = Status::InvalidArgument(
|
|
"Cannot ingest an external file into a dropped CF");
|
|
}
|
|
|
|
// Figure out if we need to flush the memtable first
|
|
if (status.ok()) {
|
|
bool need_flush = false;
|
|
status = ingestion_job.NeedsFlush(&need_flush);
|
|
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
|
|
&need_flush);
|
|
if (status.ok() && need_flush) {
|
|
mutex_.Unlock();
|
|
status = FlushMemTable(cfd, FlushOptions(), true /* writes_stopped */);
|
|
mutex_.Lock();
|
|
}
|
|
}
|
|
|
|
// Run the ingestion job
|
|
if (status.ok()) {
|
|
status = ingestion_job.Run();
|
|
}
|
|
|
|
// Install job edit [Mutex will be unlocked here]
|
|
auto mutable_cf_options = cfd->GetLatestMutableCFOptions();
|
|
if (status.ok()) {
|
|
status =
|
|
versions_->LogAndApply(cfd, *mutable_cf_options, ingestion_job.edit(),
|
|
&mutex_, directories_.GetDbDir());
|
|
}
|
|
if (status.ok()) {
|
|
InstallSuperVersionAndScheduleWork(cfd, &sv_context,
|
|
*mutable_cf_options);
|
|
}
|
|
|
|
// Resume writes to the DB
|
|
if (concurrent_prepare_) {
|
|
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
|
}
|
|
write_thread_.ExitUnbatched(&w);
|
|
|
|
// Update stats
|
|
if (status.ok()) {
|
|
ingestion_job.UpdateStats();
|
|
}
|
|
|
|
ReleaseFileNumberFromPendingOutputs(pending_output_elem);
|
|
|
|
num_running_ingest_file_--;
|
|
if (num_running_ingest_file_ == 0) {
|
|
bg_cv_.SignalAll();
|
|
}
|
|
|
|
TEST_SYNC_POINT("DBImpl::AddFile:MutexUnlock");
|
|
}
|
|
// mutex_ is unlocked here
|
|
|
|
// Cleanup
|
|
sv_context.Clean();
|
|
ingestion_job.Cleanup(status);
|
|
|
|
if (status.ok()) {
|
|
NotifyOnExternalFileIngested(cfd, ingestion_job);
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
Status DBImpl::VerifyChecksum() {
|
|
Status s;
|
|
Options options;
|
|
EnvOptions env_options;
|
|
std::vector<ColumnFamilyData*> cfd_list;
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
if (!cfd->IsDropped() && cfd->initialized()) {
|
|
cfd->Ref();
|
|
cfd_list.push_back(cfd);
|
|
}
|
|
}
|
|
}
|
|
std::vector<SuperVersion*> sv_list;
|
|
for (auto cfd : cfd_list) {
|
|
sv_list.push_back(cfd->GetReferencedSuperVersion(&mutex_));
|
|
}
|
|
for (auto& sv : sv_list) {
|
|
VersionStorageInfo* vstorage = sv->current->storage_info();
|
|
for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
|
|
for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
|
|
j++) {
|
|
const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd;
|
|
std::string fname = TableFileName(immutable_db_options_.db_paths,
|
|
fd.GetNumber(), fd.GetPathId());
|
|
s = rocksdb::VerifySstFileChecksum(options, env_options, fname);
|
|
}
|
|
}
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
}
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
for (auto sv : sv_list) {
|
|
if (sv && sv->Unref()) {
|
|
sv->Cleanup();
|
|
delete sv;
|
|
}
|
|
}
|
|
for (auto cfd : cfd_list) {
|
|
cfd->Unref();
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
void DBImpl::NotifyOnExternalFileIngested(
|
|
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (immutable_db_options_.listeners.empty()) {
|
|
return;
|
|
}
|
|
|
|
for (const IngestedFileInfo& f : ingestion_job.files_to_ingest()) {
|
|
ExternalFileIngestionInfo info;
|
|
info.cf_name = cfd->GetName();
|
|
info.external_file_path = f.external_file_path;
|
|
info.internal_file_path = f.internal_file_path;
|
|
info.global_seqno = f.assigned_seqno;
|
|
info.table_properties = f.table_properties;
|
|
for (auto listener : immutable_db_options_.listeners) {
|
|
listener->OnExternalFileIngested(this, info);
|
|
}
|
|
}
|
|
|
|
#endif
|
|
}
|
|
|
|
void DBImpl::WaitForIngestFile() {
|
|
mutex_.AssertHeld();
|
|
while (num_running_ingest_file_ > 0) {
|
|
bg_cv_.Wait();
|
|
}
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
} // namespace rocksdb
|