mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 04:41:49 +00:00
320d9a8e8a
Summary: The patch replaces `std::map` with a sorted `std::vector` for `VersionStorageInfo::blob_files_` and preallocates the space for the `vector` before saving the `BlobFileMetaData` into the new `VersionStorageInfo` in `VersionBuilder::Rep::SaveBlobFilesTo`. These changes reduce the time the DB mutex is held while saving new `Version`s, and using a sorted `vector` also makes lookups faster thanks to better memory locality. In addition, the patch introduces helper methods `VersionStorageInfo::GetBlobFileMetaData` and `VersionStorageInfo::GetBlobFileMetaDataLB` that can be used by clients to perform lookups in the `vector`, and does some general cleanup in the parts of code where blob file metadata are used. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9526 Test Plan: Ran `make check` and the crash test script for a while. Performance was tested using a load-optimized benchmark (`fillseq` with vector memtable, no WAL) and small file sizes so that a significant number of files are produced: ``` numactl --interleave=all ./db_bench --benchmarks=fillseq --allow_concurrent_memtable_write=false --level0_file_num_compaction_trigger=4 --level0_slowdown_writes_trigger=20 --level0_stop_writes_trigger=30 --max_background_jobs=8 --max_write_buffer_number=8 --db=/data/ltamasi-dbbench --wal_dir=/data/ltamasi-dbbench --num=800000000 --num_levels=8 --key_size=20 --value_size=400 --block_size=8192 --cache_size=51539607552 --cache_numshardbits=6 --compression_max_dict_bytes=0 --compression_ratio=0.5 --compression_type=lz4 --bytes_per_sync=8388608 --cache_index_and_filter_blocks=1 --cache_high_pri_pool_ratio=0.5 --benchmark_write_rate_limit=0 --write_buffer_size=16777216 --target_file_size_base=16777216 --max_bytes_for_level_base=67108864 --verify_checksum=1 --delete_obsolete_files_period_micros=62914560 --max_bytes_for_level_multiplier=8 --statistics=0 --stats_per_interval=1 --stats_interval_seconds=20 --histogram=1 --memtablerep=skip_list --bloom_bits=10 --open_files=-1 --subcompactions=1 --compaction_style=0 --min_level_to_compress=3 --level_compaction_dynamic_level_bytes=true --pin_l0_filter_and_index_blocks_in_cache=1 --soft_pending_compaction_bytes_limit=167503724544 --hard_pending_compaction_bytes_limit=335007449088 --min_level_to_compress=0 --use_existing_db=0 --sync=0 --threads=1 --memtablerep=vector --allow_concurrent_memtable_write=false --disable_wal=1 --enable_blob_files=1 --blob_file_size=16777216 --min_blob_size=0 --blob_compression_type=lz4 --enable_blob_garbage_collection=1 --seed=<some value> ``` Final statistics before the patch: ``` Cumulative writes: 0 writes, 700M keys, 0 commit groups, 0.0 writes per commit group, ingest: 284.62 GB, 121.27 MB/s Interval writes: 0 writes, 334K keys, 0 commit groups, 0.0 writes per commit group, ingest: 139.28 MB, 72.46 MB/s ``` With the patch: ``` Cumulative writes: 0 writes, 760M keys, 0 commit groups, 0.0 writes per commit group, ingest: 308.66 GB, 131.52 MB/s Interval writes: 0 writes, 445K keys, 0 commit groups, 0.0 writes per commit group, ingest: 185.35 MB, 93.15 MB/s ``` Total time to complete the benchmark is 2611 seconds with the patch, down from 2986 secs. Reviewed By: riversand963 Differential Revision: D34082728 Pulled By: ltamasi fbshipit-source-id: fc598abf676dce436734d06bb9d2d99a26a004fc
409 lines
13 KiB
C++
409 lines
13 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include <algorithm>
|
|
#include <cstdint>
|
|
#include <memory>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
#include "db/db_impl/db_impl.h"
|
|
#include "db/job_context.h"
|
|
#include "db/version_set.h"
|
|
#include "file/file_util.h"
|
|
#include "file/filename.h"
|
|
#include "logging/logging.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/metadata.h"
|
|
#include "rocksdb/types.h"
|
|
#include "test_util/sync_point.h"
|
|
#include "util/file_checksum_helper.h"
|
|
#include "util/mutexlock.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
Status DBImpl::FlushForGetLiveFiles() {
|
|
mutex_.AssertHeld();
|
|
|
|
// flush all dirty data to disk.
|
|
Status status;
|
|
if (immutable_db_options_.atomic_flush) {
|
|
autovector<ColumnFamilyData*> cfds;
|
|
SelectColumnFamiliesForAtomicFlush(&cfds);
|
|
mutex_.Unlock();
|
|
status =
|
|
AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kGetLiveFiles);
|
|
if (status.IsColumnFamilyDropped()) {
|
|
status = Status::OK();
|
|
}
|
|
mutex_.Lock();
|
|
} else {
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
if (cfd->IsDropped()) {
|
|
continue;
|
|
}
|
|
cfd->Ref();
|
|
mutex_.Unlock();
|
|
status = FlushMemTable(cfd, FlushOptions(), FlushReason::kGetLiveFiles);
|
|
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
|
|
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
|
|
mutex_.Lock();
|
|
cfd->UnrefAndTryDelete();
|
|
if (!status.ok() && !status.IsColumnFamilyDropped()) {
|
|
break;
|
|
} else if (status.IsColumnFamilyDropped()) {
|
|
status = Status::OK();
|
|
}
|
|
}
|
|
}
|
|
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
|
|
return status;
|
|
}
|
|
|
|
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
|
uint64_t* manifest_file_size,
|
|
bool flush_memtable) {
|
|
*manifest_file_size = 0;
|
|
|
|
mutex_.Lock();
|
|
|
|
if (flush_memtable) {
|
|
Status status = FlushForGetLiveFiles();
|
|
if (!status.ok()) {
|
|
mutex_.Unlock();
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Cannot Flush data %s\n",
|
|
status.ToString().c_str());
|
|
return status;
|
|
}
|
|
}
|
|
|
|
// Make a set of all of the live table and blob files
|
|
std::vector<uint64_t> live_table_files;
|
|
std::vector<uint64_t> live_blob_files;
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
if (cfd->IsDropped()) {
|
|
continue;
|
|
}
|
|
cfd->current()->AddLiveFiles(&live_table_files, &live_blob_files);
|
|
}
|
|
|
|
ret.clear();
|
|
ret.reserve(live_table_files.size() + live_blob_files.size() +
|
|
3); // for CURRENT + MANIFEST + OPTIONS
|
|
|
|
// create names of the live files. The names are not absolute
|
|
// paths, instead they are relative to dbname_;
|
|
for (const auto& table_file_number : live_table_files) {
|
|
ret.emplace_back(MakeTableFileName("", table_file_number));
|
|
}
|
|
|
|
for (const auto& blob_file_number : live_blob_files) {
|
|
ret.emplace_back(BlobFileName("", blob_file_number));
|
|
}
|
|
|
|
ret.emplace_back(CurrentFileName(""));
|
|
ret.emplace_back(DescriptorFileName("", versions_->manifest_file_number()));
|
|
// The OPTIONS file number is zero in read-write mode when OPTIONS file
|
|
// writing failed and the DB was configured with
|
|
// `fail_if_options_file_error == false`. In read-only mode the OPTIONS file
|
|
// number is zero when no OPTIONS file exist at all. In those cases we do not
|
|
// record any OPTIONS file in the live file list.
|
|
if (versions_->options_file_number() != 0) {
|
|
ret.emplace_back(OptionsFileName("", versions_->options_file_number()));
|
|
}
|
|
|
|
// find length of manifest file while holding the mutex lock
|
|
*manifest_file_size = versions_->manifest_file_size();
|
|
|
|
mutex_.Unlock();
|
|
return Status::OK();
|
|
}
|
|
|
|
Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
|
|
// If caller disabled deletions, this function should return files that are
|
|
// guaranteed not to be deleted until deletions are re-enabled. We need to
|
|
// wait for pending purges to finish since WalManager doesn't know which
|
|
// files are going to be purged. Additional purges won't be scheduled as
|
|
// long as deletions are disabled (so the below loop must terminate).
|
|
// Also note that we disable deletions anyway to avoid the case where a
|
|
// file is deleted in the middle of the scan, causing IO error.
|
|
Status deletions_disabled = DisableFileDeletions();
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
while (pending_purge_obsolete_files_ > 0 || bg_purge_scheduled_ > 0) {
|
|
bg_cv_.Wait();
|
|
}
|
|
}
|
|
|
|
Status s = wal_manager_.GetSortedWalFiles(files);
|
|
|
|
// DisableFileDeletions / EnableFileDeletions not supported in read-only DB
|
|
if (deletions_disabled.ok()) {
|
|
Status s2 = EnableFileDeletions(/*force*/ false);
|
|
assert(s2.ok());
|
|
s2.PermitUncheckedError();
|
|
} else {
|
|
assert(deletions_disabled.IsNotSupported());
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::GetCurrentWalFile(std::unique_ptr<LogFile>* current_log_file) {
|
|
uint64_t current_logfile_number;
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
current_logfile_number = logfile_number_;
|
|
}
|
|
|
|
return wal_manager_.GetLiveWalFile(current_logfile_number, current_log_file);
|
|
}
|
|
|
|
Status DBImpl::GetLiveFilesStorageInfo(
|
|
const LiveFilesStorageInfoOptions& opts,
|
|
std::vector<LiveFileStorageInfo>* files) {
|
|
// To avoid returning partial results, only move to ouput on success
|
|
assert(files);
|
|
files->clear();
|
|
std::vector<LiveFileStorageInfo> results;
|
|
|
|
// NOTE: This implementation was largely migrated from Checkpoint.
|
|
|
|
Status s;
|
|
VectorLogPtr live_wal_files;
|
|
bool flush_memtable = true;
|
|
if (!immutable_db_options_.allow_2pc) {
|
|
if (opts.wal_size_for_flush == port::kMaxUint64) {
|
|
flush_memtable = false;
|
|
} else if (opts.wal_size_for_flush > 0) {
|
|
// If out standing log files are small, we skip the flush.
|
|
s = GetSortedWalFiles(live_wal_files);
|
|
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
// Don't flush column families if total log size is smaller than
|
|
// log_size_for_flush. We copy the log files instead.
|
|
// We may be able to cover 2PC case too.
|
|
uint64_t total_wal_size = 0;
|
|
for (auto& wal : live_wal_files) {
|
|
total_wal_size += wal->SizeFileBytes();
|
|
}
|
|
if (total_wal_size < opts.wal_size_for_flush) {
|
|
flush_memtable = false;
|
|
}
|
|
live_wal_files.clear();
|
|
}
|
|
}
|
|
|
|
// This is a modified version of GetLiveFiles, to get access to more
|
|
// metadata.
|
|
mutex_.Lock();
|
|
if (flush_memtable) {
|
|
Status status = FlushForGetLiveFiles();
|
|
if (!status.ok()) {
|
|
mutex_.Unlock();
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "Cannot Flush data %s\n",
|
|
status.ToString().c_str());
|
|
return status;
|
|
}
|
|
}
|
|
|
|
// Make a set of all of the live table and blob files
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
if (cfd->IsDropped()) {
|
|
continue;
|
|
}
|
|
VersionStorageInfo& vsi = *cfd->current()->storage_info();
|
|
auto& cf_paths = cfd->ioptions()->cf_paths;
|
|
|
|
auto GetDir = [&](size_t path_id) {
|
|
// Matching TableFileName() behavior
|
|
if (path_id >= cf_paths.size()) {
|
|
assert(false);
|
|
return cf_paths.back().path;
|
|
} else {
|
|
return cf_paths[path_id].path;
|
|
}
|
|
};
|
|
|
|
for (int level = 0; level < vsi.num_levels(); ++level) {
|
|
const auto& level_files = vsi.LevelFiles(level);
|
|
for (const auto& meta : level_files) {
|
|
assert(meta);
|
|
|
|
results.emplace_back();
|
|
LiveFileStorageInfo& info = results.back();
|
|
|
|
info.relative_filename = MakeTableFileName(meta->fd.GetNumber());
|
|
info.directory = GetDir(meta->fd.GetPathId());
|
|
info.file_number = meta->fd.GetNumber();
|
|
info.file_type = kTableFile;
|
|
info.size = meta->fd.GetFileSize();
|
|
if (opts.include_checksum_info) {
|
|
info.file_checksum_func_name = meta->file_checksum_func_name;
|
|
info.file_checksum = meta->file_checksum;
|
|
if (info.file_checksum_func_name.empty()) {
|
|
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
|
|
info.file_checksum = kUnknownFileChecksum;
|
|
}
|
|
}
|
|
info.temperature = meta->temperature;
|
|
}
|
|
}
|
|
const auto& blob_files = vsi.GetBlobFiles();
|
|
for (const auto& meta : blob_files) {
|
|
assert(meta);
|
|
|
|
results.emplace_back();
|
|
LiveFileStorageInfo& info = results.back();
|
|
|
|
info.relative_filename = BlobFileName(meta->GetBlobFileNumber());
|
|
info.directory = GetDir(/* path_id */ 0);
|
|
info.file_number = meta->GetBlobFileNumber();
|
|
info.file_type = kBlobFile;
|
|
info.size = meta->GetBlobFileSize();
|
|
if (opts.include_checksum_info) {
|
|
info.file_checksum_func_name = meta->GetChecksumMethod();
|
|
info.file_checksum = meta->GetChecksumValue();
|
|
if (info.file_checksum_func_name.empty()) {
|
|
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
|
|
info.file_checksum = kUnknownFileChecksum;
|
|
}
|
|
}
|
|
// TODO?: info.temperature
|
|
}
|
|
}
|
|
|
|
// Capture some final info before releasing mutex
|
|
const uint64_t manifest_number = versions_->manifest_file_number();
|
|
const uint64_t manifest_size = versions_->manifest_file_size();
|
|
const uint64_t options_number = versions_->options_file_number();
|
|
const uint64_t options_size = versions_->options_file_size_;
|
|
const uint64_t min_log_num = MinLogNumberToKeep();
|
|
|
|
mutex_.Unlock();
|
|
|
|
std::string manifest_fname = DescriptorFileName(manifest_number);
|
|
{ // MANIFEST
|
|
results.emplace_back();
|
|
LiveFileStorageInfo& info = results.back();
|
|
|
|
info.relative_filename = manifest_fname;
|
|
info.directory = GetName();
|
|
info.file_number = manifest_number;
|
|
info.file_type = kDescriptorFile;
|
|
info.size = manifest_size;
|
|
info.trim_to_size = true;
|
|
if (opts.include_checksum_info) {
|
|
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
|
|
info.file_checksum = kUnknownFileChecksum;
|
|
}
|
|
}
|
|
|
|
{ // CURRENT
|
|
results.emplace_back();
|
|
LiveFileStorageInfo& info = results.back();
|
|
|
|
info.relative_filename = kCurrentFileName;
|
|
info.directory = GetName();
|
|
info.file_type = kCurrentFile;
|
|
// CURRENT could be replaced so we have to record the contents we want
|
|
// for it
|
|
info.replacement_contents = manifest_fname + "\n";
|
|
info.size = manifest_fname.size() + 1;
|
|
if (opts.include_checksum_info) {
|
|
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
|
|
info.file_checksum = kUnknownFileChecksum;
|
|
}
|
|
}
|
|
|
|
// The OPTIONS file number is zero in read-write mode when OPTIONS file
|
|
// writing failed and the DB was configured with
|
|
// `fail_if_options_file_error == false`. In read-only mode the OPTIONS file
|
|
// number is zero when no OPTIONS file exist at all. In those cases we do not
|
|
// record any OPTIONS file in the live file list.
|
|
if (options_number != 0) {
|
|
results.emplace_back();
|
|
LiveFileStorageInfo& info = results.back();
|
|
|
|
info.relative_filename = OptionsFileName(options_number);
|
|
info.directory = GetName();
|
|
info.file_number = options_number;
|
|
info.file_type = kOptionsFile;
|
|
info.size = options_size;
|
|
if (opts.include_checksum_info) {
|
|
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
|
|
info.file_checksum = kUnknownFileChecksum;
|
|
}
|
|
}
|
|
|
|
// Some legacy testing stuff TODO: carefully clean up obsolete parts
|
|
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:FlushDone");
|
|
|
|
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
|
|
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
|
|
|
|
if (s.ok()) {
|
|
s = FlushWAL(false /* sync */);
|
|
}
|
|
|
|
TEST_SYNC_POINT("CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1");
|
|
TEST_SYNC_POINT("CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2");
|
|
|
|
// if we have more than one column family, we need to also get WAL files
|
|
if (s.ok()) {
|
|
s = GetSortedWalFiles(live_wal_files);
|
|
}
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
size_t wal_size = live_wal_files.size();
|
|
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"Number of log files %" ROCKSDB_PRIszt, live_wal_files.size());
|
|
|
|
// Link WAL files. Copy exact size of last one because it is the only one
|
|
// that has changes after the last flush.
|
|
auto wal_dir = immutable_db_options_.GetWalDir();
|
|
for (size_t i = 0; s.ok() && i < wal_size; ++i) {
|
|
if ((live_wal_files[i]->Type() == kAliveLogFile) &&
|
|
(!flush_memtable || live_wal_files[i]->LogNumber() >= min_log_num)) {
|
|
results.emplace_back();
|
|
LiveFileStorageInfo& info = results.back();
|
|
auto f = live_wal_files[i]->PathName();
|
|
assert(!f.empty() && f[0] == '/');
|
|
info.relative_filename = f.substr(1);
|
|
info.directory = wal_dir;
|
|
info.file_number = live_wal_files[i]->LogNumber();
|
|
info.file_type = kWalFile;
|
|
info.size = live_wal_files[i]->SizeFileBytes();
|
|
// Only last should need to be trimmed
|
|
info.trim_to_size = (i + 1 == wal_size);
|
|
if (opts.include_checksum_info) {
|
|
info.file_checksum_func_name = kUnknownFileChecksumFuncName;
|
|
info.file_checksum = kUnknownFileChecksum;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (s.ok()) {
|
|
// Only move output on success
|
|
*files = std::move(results);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
#endif // ROCKSDB_LITE
|