Add BackupEngine feature to exclude files (#11030)

Summary:
We have a request for RocksDB to essentially support
disconnected incremental backup. In other words, if there is limited
or no connectivity to the primary backup dir, we should still be able to
take an incremental backup relative to that primary backup dir,
assuming some metadata about that primary dir is available (and
obviously anticipating primary backup dir will be fully available if
restore is needed).

To support that, this feature allows the API user to "exclude" DB
files from backup. This only applies to files that can be shared
between backups (sst and blob files), and excluded files are
tracked in the backup metadata sufficiently to ensure they are
restored at restore time. At restore time, the user provides
a set of alternate backup directories (as open BackupEngines, which
can be read-only), and excluded files must be found in one of the
backup directories ("included" in some backup).

This feature depends on backup schema version 2 features, though
schema version 2.0 support is not sufficient to read / restore a
backup with exclusions. This change updates the schema version to
2.1 because of this feature, so that it's easy to recognize whether
a RocksDB release supports this feature, while backups not using the
feature are fully compatible with 2.0.

Also in this PR:
* Stacked on https://github.com/facebook/rocksdb/pull/11029
* Allow progress_callback to be empty, not just no-op function, and
recover from exceptions thrown by BackupEngine callbacks.
* The internal-only `AsBackupEngine()` function is working around the
diamond hierarchy of `BackupEngineImplThreadSafe` to get to the
internals, without using confusing features like virtual inheritance.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11030

Test Plan: unit tests added / updated

Reviewed By: ajkr

Differential Revision: D42004388

Pulled By: pdillinger

fbshipit-source-id: 31b6e533d308a5462e528d9012d650482d974077
This commit is contained in:
Peter Dillinger 2022-12-29 10:42:50 -08:00 committed by Facebook GitHub Bot
parent bec4264813
commit 02f2b20864
4 changed files with 510 additions and 87 deletions

View File

@ -19,6 +19,9 @@
### New Features
* When an SstPartitionerFactory is configured, CompactRange() now automatically selects for compaction any files overlapping a partition boundary that is in the compaction range, even if no actual entries are in the requested compaction range. With this feature, manual compaction can be used to (re-)establish SST partition points when SstPartitioner changes, without a full compaction.
### New Features
* Add BackupEngine feature to exclude files from backup that are known to be backed up elsewhere, using `CreateBackupOptions::exclude_files_callback`. To restore the DB, the excluded files must be provided in alternative backup directories using `RestoreOptions::alternate_dirs`.
## 7.9.0 (11/21/2022)
### Performance Improvements
* Fixed an iterator performance regression for delete range users when scanning through a consecutive sequence of range tombstones (#10877).

View File

@ -11,6 +11,7 @@
#ifndef ROCKSDB_LITE
#include <cstdint>
#include <forward_list>
#include <functional>
#include <map>
#include <string>
@ -23,6 +24,8 @@
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
class BackupEngineReadOnlyBase;
class BackupEngine;
// The default DB file checksum function name.
constexpr char kDbFileChecksumFuncName[] = "FileChecksumCrc32c";
@ -270,6 +273,28 @@ inline BackupEngineOptions::ShareFilesNaming operator|(
return static_cast<BackupEngineOptions::ShareFilesNaming>(l | r);
}
// Identifying information about a backup shared file that is (or might be)
// excluded from a backup using exclude_files_callback.
struct BackupExcludedFileInfo {
explicit BackupExcludedFileInfo(const std::string& _relative_file)
: relative_file(_relative_file) {}
// File name and path relative to the backup dir.
std::string relative_file;
};
// An auxiliary structure for exclude_files_callback
struct MaybeExcludeBackupFile {
explicit MaybeExcludeBackupFile(BackupExcludedFileInfo&& _info)
: info(std::move(_info)) {}
// Identifying information about a backup shared file that could be excluded
const BackupExcludedFileInfo info;
// API user sets to true if the file should be excluded from this backup
bool exclude_decision = false;
};
struct CreateBackupOptions {
// Flush will always trigger if 2PC is enabled.
// If write-ahead logs are disabled, set flush_before_backup=true to
@ -278,10 +303,31 @@ struct CreateBackupOptions {
// Callback for reporting progress, based on callback_trigger_interval_size.
//
// RocksDB callbacks are NOT exception-safe. A callback completing with an
// exception can lead to undefined behavior in RocksDB, including data loss,
// unreported corruption, deadlocks, and more.
std::function<void()> progress_callback = []() {};
// An exception thrown from the callback will result in Status::Aborted from
// the operation.
std::function<void()> progress_callback = {};
// A callback that allows the API user to select files for exclusion, such
// as if the files are known to exist in an alternate backup directory.
// Only "shared" files can be excluded from backups. This is an advanced
// feature because the BackupEngine user is trusted to keep track of files
// such that the DB can be restored.
//
// Input to the callback is a [begin,end) range of sharable files live in
// the DB being backed up, and the callback implementation sets
// exclude_decision=true for files to exclude. A callback offers maximum
// flexibility, e.g. if remote files are unavailable at backup time but
// whose existence has been recorded somewhere. In case of an empty or
// no-op callback, all files are included in the backup .
//
// To restore the DB, RestoreOptions::alternate_dirs must be used to provide
// the excluded files.
//
// An exception thrown from the callback will result in Status::Aborted from
// the operation.
std::function<void(MaybeExcludeBackupFile* files_begin,
MaybeExcludeBackupFile* files_end)>
exclude_files_callback = {};
// If false, background_thread_cpu_priority is ignored.
// Otherwise, the cpu priority can be decreased,
@ -300,6 +346,11 @@ struct RestoreOptions {
// Default: false
bool keep_log_files;
// For backups that were created using exclude_files_callback, this
// option enables restoring those backups by providing BackupEngines on
// directories known to contain the required files.
std::forward_list<BackupEngineReadOnlyBase*> alternate_dirs;
explicit RestoreOptions(bool _keep_log_files = false)
: keep_log_files(_keep_log_files) {}
};
@ -324,9 +375,15 @@ struct BackupInfo {
// Backup API user metadata
std::string app_metadata;
// Backup file details, if requested with include_file_details=true
// Backup file details, if requested with include_file_details=true.
// Does not include excluded_files.
std::vector<BackupFileInfo> file_details;
// Identifying information about shared files that were excluded from the
// created backup. See exclude_files_callback and alternate_dirs.
// This information is only provided if include_file_details=true.
std::vector<BackupExcludedFileInfo> excluded_files;
// DB "name" (a directory in the backup_env) for opening this backup as a
// read-only DB. This should also be used as the DBOptions::wal_dir, such
// as by default setting wal_dir="". See also env_for_open.
@ -348,8 +405,8 @@ struct BackupInfo {
BackupInfo() {}
BackupInfo(BackupID _backup_id, int64_t _timestamp, uint64_t _size,
uint32_t _number_files, const std::string& _app_metadata)
explicit BackupInfo(BackupID _backup_id, int64_t _timestamp, uint64_t _size,
uint32_t _number_files, const std::string& _app_metadata)
: backup_id(_backup_id),
timestamp(_timestamp),
size(_size),
@ -364,8 +421,8 @@ class BackupStatistics {
number_fail_backup = 0;
}
BackupStatistics(uint32_t _number_success_backup,
uint32_t _number_fail_backup)
explicit BackupStatistics(uint32_t _number_success_backup,
uint32_t _number_fail_backup)
: number_success_backup(_number_success_backup),
number_fail_backup(_number_fail_backup) {}
@ -462,6 +519,9 @@ class BackupEngineReadOnlyBase {
// Returns Status::OK() if all checks are good
virtual IOStatus VerifyBackup(BackupID backup_id,
bool verify_with_checksum = false) const = 0;
// Internal use only
virtual BackupEngine* AsBackupEngine() = 0;
};
// Append-only functions of a BackupEngine. See BackupEngine comment for

View File

@ -13,11 +13,13 @@
#include <atomic>
#include <cinttypes>
#include <cstdlib>
#include <exception>
#include <functional>
#include <future>
#include <limits>
#include <map>
#include <mutex>
#include <optional>
#include <sstream>
#include <string>
#include <thread>
@ -155,16 +157,10 @@ class BackupEngineImpl {
void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) const;
IOStatus RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id, const std::string& db_dir,
const std::string& wal_dir) const;
IOStatus RestoreDBFromLatestBackup(const RestoreOptions& options,
const std::string& db_dir,
const std::string& wal_dir) const {
// Note: don't read latest_valid_backup_id_ outside of lock
return RestoreDBFromBackup(options, kLatestBackupIDMarker, db_dir, wal_dir);
}
IOStatus RestoreDBFromBackup(
const RestoreOptions& options, BackupID backup_id,
const std::string& db_dir, const std::string& wal_dir,
const std::list<const BackupEngineImpl*>& locked_restore_from_dirs) const;
IOStatus VerifyBackup(BackupID backup_id,
bool verify_with_checksum = false) const;
@ -220,6 +216,7 @@ class BackupEngineImpl {
FileInfo& operator=(const FileInfo&) = delete;
int refs;
// Relative path from backup dir
const std::string filename;
const uint64_t size;
// crc32c checksum as hex. empty == unknown / unavailable
@ -233,7 +230,7 @@ class BackupEngineImpl {
const std::string db_session_id;
Temperature temp;
std::string GetDbFileName() {
std::string GetDbFileName() const {
std::string rv;
// extract the filename part
size_t slash = filename.find_last_of('/');
@ -415,6 +412,10 @@ class BackupEngineImpl {
IOStatus AddFile(std::shared_ptr<FileInfo> file_info);
void AddExcludedFile(const std::string& relative_file) {
excluded_files_.emplace_back(relative_file);
}
IOStatus Delete(bool delete_meta = true);
bool Empty() const { return files_.empty(); }
@ -431,6 +432,10 @@ class BackupEngineImpl {
return files_;
}
const std::vector<BackupExcludedFileInfo>& GetExcludedFiles() const {
return excluded_files_;
}
// @param abs_path_to_size Pre-fetched file sizes (bytes).
IOStatus LoadFromFile(
const std::string& backup_dir,
@ -488,6 +493,7 @@ class BackupEngineImpl {
std::string const meta_tmp_filename_;
// files with relative paths (without "/" prefix!!)
std::vector<std::shared_ptr<FileInfo>> files_;
std::vector<BackupExcludedFileInfo> excluded_files_;
std::unordered_map<std::string, std::shared_ptr<FileInfo>>* file_infos_;
Env* env_;
mutable std::shared_ptr<Env> env_for_open_;
@ -681,17 +687,19 @@ class BackupEngineImpl {
return *this;
}
CopyOrCreateWorkItem(
std::string _src_path, std::string _dst_path,
const Temperature _src_temperature, const Temperature _dst_temperature,
std::string _contents, Env* _src_env, Env* _dst_env,
EnvOptions _src_env_options, bool _sync, RateLimiter* _rate_limiter,
uint64_t _size_limit, Statistics* _stats,
std::function<void()> _progress_callback = []() {},
const std::string& _src_checksum_func_name =
kUnknownFileChecksumFuncName,
const std::string& _src_checksum_hex = "",
const std::string& _db_id = "", const std::string& _db_session_id = "")
CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
const Temperature _src_temperature,
const Temperature _dst_temperature,
std::string _contents, Env* _src_env, Env* _dst_env,
EnvOptions _src_env_options, bool _sync,
RateLimiter* _rate_limiter, uint64_t _size_limit,
Statistics* _stats,
std::function<void()> _progress_callback = {},
const std::string& _src_checksum_func_name =
kUnknownFileChecksumFuncName,
const std::string& _src_checksum_hex = "",
const std::string& _db_id = "",
const std::string& _db_session_id = "")
: src_path(std::move(_src_path)),
dst_path(std::move(_dst_path)),
src_temperature(_src_temperature),
@ -758,6 +766,9 @@ class BackupEngineImpl {
dst_relative(std::move(_dst_relative)) {}
};
using BackupWorkItemPair =
std::pair<CopyOrCreateWorkItem, BackupAfterCopyOrCreateWorkItem>;
struct RestoreAfterCopyOrCreateWorkItem {
std::future<CopyOrCreateResult> result;
std::string from_file;
@ -807,13 +818,14 @@ class BackupEngineImpl {
// @param contents If non-empty, the file will be created with these contents.
IOStatus AddBackupFileWorkItem(
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
std::deque<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
std::deque<BackupWorkItemPair>* excludable_items, BackupID backup_id,
bool shared, const std::string& src_dir,
const std::string& fname, // starts with "/"
const EnvOptions& src_env_options, RateLimiter* rate_limiter,
FileType file_type, uint64_t size_bytes, Statistics* stats,
uint64_t size_limit = 0, bool shared_checksum = false,
std::function<void()> progress_callback = []() {},
std::function<void()> progress_callback = {},
const std::string& contents = std::string(),
const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName,
const std::string& src_checksum_str = kUnknownFileChecksum,
@ -923,8 +935,37 @@ class BackupEngineImplThreadSafe : public BackupEngine,
IOStatus RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id, const std::string& db_dir,
const std::string& wal_dir) const override {
ReadLock lock(&mutex_);
return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir);
// TSAN reports a lock inversion (potential deadlock) if we acquire read
// locks in different orders. Assuming the implementation of RWMutex
// allows simultaneous read locks, there should be no deadlock, because
// there is no write lock involved here. Nevertheless, to appease TSAN and
// in case of degraded RWMutex implementation, we lock the BackupEngines
// including this one and those in options.alternate_dirs in a consistent
// order.
// However, locked_restore_from_dirs is kept in "search" order.
std::list<const BackupEngineImpl*> locked_restore_from_dirs;
std::vector<port::RWMutex*> mutexes;
// Add `this`
locked_restore_from_dirs.emplace_back(&impl_);
mutexes.push_back(&mutex_);
// Add alternates
for (BackupEngineReadOnlyBase* be : options.alternate_dirs) {
BackupEngineImplThreadSafe* bets =
static_cast_with_check<BackupEngineImplThreadSafe>(
be->AsBackupEngine());
locked_restore_from_dirs.emplace_back(&bets->impl_);
mutexes.push_back(&bets->mutex_);
}
// Acquire read locks in pointer order
std::sort(mutexes.begin(), mutexes.end());
std::vector<ReadLock> locks(mutexes.begin(), mutexes.end());
// Impl
return impl_.RestoreDBFromBackup(options, backup_id, db_dir, wal_dir,
locked_restore_from_dirs);
}
using BackupEngine::RestoreDBFromLatestBackup;
@ -941,6 +982,8 @@ class BackupEngineImplThreadSafe : public BackupEngine,
return impl_.VerifyBackup(backup_id, verify_with_checksum);
}
BackupEngine* AsBackupEngine() override { return this; }
// Not public API but needed
IOStatus Initialize() {
// No locking needed
@ -966,6 +1009,7 @@ class BackupEngineImplThreadSafe : public BackupEngine,
mutable port::RWMutex mutex_;
BackupEngineImpl impl_;
};
} // namespace
IOStatus BackupEngine::Open(const BackupEngineOptions& options, Env* env,
@ -1300,6 +1344,12 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
return IOStatus::InvalidArgument("App metadata too large");
}
bool maybe_exclude_items = bool{options.exclude_files_callback};
if (maybe_exclude_items && options_.schema_version < 2) {
return IOStatus::InvalidArgument(
"exclude_files_callback requires schema_version >= 2");
}
if (options.decrease_background_thread_cpu_priority) {
if (options.background_thread_cpu_priority < threads_cpu_priority_) {
threads_cpu_priority_.store(options.background_thread_cpu_priority);
@ -1364,7 +1414,8 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
// live file.
std::unordered_set<std::string> live_dst_paths;
std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
std::deque<BackupWorkItemPair> excludable_items;
std::deque<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
// Add a CopyOrCreateWorkItem to the channel for each live file
Status disabled = db->DisableFileDeletions();
DBOptions db_options = db->GetDBOptions();
@ -1436,7 +1487,8 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
break;
}
io_st = AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
live_dst_paths, backup_items_to_finish,
maybe_exclude_items ? &excludable_items : nullptr, new_backup_id,
options_.share_table_files &&
(type == kTableFile || type == kBlobFile),
src_dirname, fname, src_env_options, rate_limiter, type,
@ -1451,7 +1503,8 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
FileType type) {
Log(options_.info_log, "add file for backup %s", fname.c_str());
return AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
live_dst_paths, backup_items_to_finish,
maybe_exclude_items ? &excludable_items : nullptr, new_backup_id,
false /* shared */, "" /* src_dir */, fname,
EnvOptions() /* src_env_options */, rate_limiter, type,
contents.size(), db_options.statistics.get(), 0 /* size_limit */,
@ -1464,7 +1517,46 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
new_backup->SetSequenceNumber(sequence_number);
}
}
ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");
ROCKS_LOG_INFO(options_.info_log, "add files for backup done.");
if (io_s.ok() && maybe_exclude_items) {
assert(options.exclude_files_callback);
size_t count = excludable_items.size();
std::vector<MaybeExcludeBackupFile> maybe_exclude_files;
maybe_exclude_files.reserve(count);
for (auto& e : excludable_items) {
maybe_exclude_files.emplace_back(
BackupExcludedFileInfo(e.second.dst_relative));
}
if (count > 0) {
try {
options.exclude_files_callback(
&maybe_exclude_files.front(),
/*end pointer*/ &maybe_exclude_files.back() + 1);
} catch (const std::exception& exn) {
io_s = IOStatus::Aborted("Exception in exclude_files_callback: " +
std::string(exn.what()));
} catch (...) {
io_s = IOStatus::Aborted("Unknown exception in exclude_files_callback");
}
}
if (io_s.ok()) {
for (size_t i = 0; i < count; ++i) {
auto& e = excludable_items[i];
if (maybe_exclude_files[i].exclude_decision) {
new_backup.get()->AddExcludedFile(e.second.dst_relative);
} else {
files_to_copy_or_create_.write(std::move(e.first));
backup_items_to_finish.push_back(std::move(e.second));
}
}
}
excludable_items.clear();
} else {
assert(!options.exclude_files_callback);
assert(excludable_items.empty());
}
ROCKS_LOG_INFO(options_.info_log,
"dispatch files for backup done, wait for finish.");
IOStatus item_io_status;
for (auto& item : backup_items_to_finish) {
item.result.wait();
@ -1699,7 +1791,7 @@ void BackupEngineImpl::SetBackupInfoFromBackupMeta(
auto& file_details = backup_info->file_details;
file_details.reserve(meta.GetFiles().size());
for (auto& file_ptr : meta.GetFiles()) {
BackupFileInfo& finfo = *file_details.emplace(file_details.end());
BackupFileInfo& finfo = file_details.emplace_back();
finfo.relative_filename = file_ptr->filename;
finfo.size = file_ptr->size;
finfo.directory = dir;
@ -1711,7 +1803,10 @@ void BackupEngineImpl::SetBackupInfoFromBackupMeta(
finfo.file_type = type;
}
// TODO: temperature, file_checksum, file_checksum_func_name
// finfo.temperature = file_ptr->temp;
}
backup_info->excluded_files = meta.GetExcludedFiles();
backup_info->name_for_open = GetAbsolutePath(GetPrivateFileRel(id));
backup_info->name_for_open.pop_back(); // remove trailing '/'
backup_info->env_for_open = meta.GetEnvForOpen();
@ -1769,7 +1864,8 @@ void BackupEngineImpl::GetCorruptedBackups(
IOStatus BackupEngineImpl::RestoreDBFromBackup(
const RestoreOptions& options, BackupID backup_id,
const std::string& db_dir, const std::string& wal_dir) const {
const std::string& db_dir, const std::string& wal_dir,
const std::list<const BackupEngineImpl*>& locked_restore_from_dirs) const {
assert(initialized_);
if (backup_id == kLatestBackupIDMarker) {
// Note: Read latest_valid_backup_id_ inside of lock
@ -1829,6 +1925,37 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup(
DeleteChildren(db_dir);
}
// Files to restore, and from where (taking into account excluded files)
std::vector<std::pair<const BackupEngineImpl*, const FileInfo*>>
restore_file_infos;
restore_file_infos.reserve(backup->GetFiles().size() +
backup->GetExcludedFiles().size());
for (const auto& ef : backup->GetExcludedFiles()) {
const std::string& file = ef.relative_file;
bool found = false;
for (auto be : locked_restore_from_dirs) {
auto it = be->backuped_file_infos_.find(file);
if (it != backuped_file_infos_.end()) {
restore_file_infos.emplace_back(be, &*it->second);
found = true;
break;
}
}
if (!found) {
return IOStatus::InvalidArgument(
"Excluded file " + file + " not found in other backups nor in " +
std::to_string(locked_restore_from_dirs.size() - 1) +
" alternate backup directories");
}
}
// Non-excluded files
for (const auto& file_info_shared : backup->GetFiles()) {
restore_file_infos.emplace_back(this, &*file_info_shared);
}
IOStatus io_s;
std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
std::string temporary_current_file;
@ -1836,8 +1963,13 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup(
std::unique_ptr<FSDirectory> db_dir_for_fsync;
std::unique_ptr<FSDirectory> wal_dir_for_fsync;
for (const auto& file_info : backup->GetFiles()) {
for (const auto& engine_and_file_info : restore_file_infos) {
const FileInfo* file_info = engine_and_file_info.second;
const std::string& file = file_info->filename;
std::string absolute_file =
engine_and_file_info.first->GetAbsolutePath(file);
Env* src_env = engine_and_file_info.first->backup_env_;
// 1. get DB filename
std::string dst = file_info->GetDbFileName();
@ -1881,8 +2013,8 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup(
ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
dst.c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
GetAbsolutePath(file), dst, Temperature::kUnknown /* src_temp */,
file_info->temp, "" /* contents */, backup_env_, db_env_,
absolute_file, dst, Temperature::kUnknown /* src_temp */,
file_info->temp, "" /* contents */, src_env, db_env_,
EnvOptions() /* src_env_options */, options_.sync,
options_.restore_rate_limiter.get(), file_info->size,
nullptr /* stats */);
@ -2122,8 +2254,19 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
while (*bytes_toward_next_callback >=
options_.callback_trigger_interval_size) {
*bytes_toward_next_callback -= options_.callback_trigger_interval_size;
std::lock_guard<std::mutex> lock(byte_report_mutex_);
progress_callback();
if (progress_callback) {
std::lock_guard<std::mutex> lock(byte_report_mutex_);
try {
progress_callback();
} catch (const std::exception& exn) {
io_s = IOStatus::Aborted("Exception in progress_callback: " +
std::string(exn.what()));
break;
} catch (...) {
io_s = IOStatus::Aborted("Unknown exception in progress_callback");
break;
}
}
}
} while (io_s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
@ -2144,11 +2287,12 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
// fname will always start with "/"
IOStatus BackupEngineImpl::AddBackupFileWorkItem(
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& fname, const EnvOptions& src_env_options,
RateLimiter* rate_limiter, FileType file_type, uint64_t size_bytes,
Statistics* stats, uint64_t size_limit, bool shared_checksum,
std::deque<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
std::deque<BackupWorkItemPair>* excludable_items, BackupID backup_id,
bool shared, const std::string& src_dir, const std::string& fname,
const EnvOptions& src_env_options, RateLimiter* rate_limiter,
FileType file_type, uint64_t size_bytes, Statistics* stats,
uint64_t size_limit, bool shared_checksum,
std::function<void()> progress_callback, const std::string& contents,
const std::string& src_checksum_func_name,
const std::string& src_checksum_str, const Temperature src_temperature) {
@ -2342,8 +2486,6 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
// Step 3: Add work item
if (!contents.empty() || need_to_copy) {
ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
copy_dest_path->c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
src_dir.empty() ? "" : src_path, *copy_dest_path, src_temperature,
Temperature::kUnknown /*dst_temp*/, contents, db_env_, backup_env_,
@ -2353,8 +2495,21 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), shared, need_to_copy,
backup_env_, temp_dest_path, final_dest_path, dst_relative);
files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
if (excludable_items != nullptr && shared && shared_checksum &&
need_to_copy) {
ROCKS_LOG_INFO(options_.info_log, "Copying (if not excluded) %s to %s",
fname.c_str(), copy_dest_path->c_str());
excludable_items->emplace_back(std::move(copy_or_create_work_item),
std::move(after_copy_or_create_work_item));
} else {
// For files known not excluded, can start copying even before finishing
// the checkpoint
ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
copy_dest_path->c_str());
files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
backup_items_to_finish.push_back(
std::move(after_copy_or_create_work_item));
}
} else {
std::promise<CopyOrCreateResult> promise_result;
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
@ -2745,6 +2900,7 @@ const std::string kAppMetaDataFieldName{"metadata"};
const std::string kFileCrc32cFieldName{"crc32"};
const std::string kFileSizeFieldName{"size"};
const std::string kTemperatureFieldName{"temp"};
const std::string kExcludedFieldName{"ni::excluded"};
// Marks a (future) field that should cause failure if not recognized.
// Other fields are assumed to be ignorable. For example, in the future
@ -2765,8 +2921,7 @@ const std::string kNonIgnorableFieldPrefix{"ni::"};
// ...
//----------------------------------------------------------
//
// For schema version 2.x (not in public APIs, but
// forward-compatibility started):
// For schema version 2.x:
//----------------------------------------------------------
// schema_version <ver>
// <timestamp>
@ -2909,20 +3064,6 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
const std::string& filename = components[0];
uint64_t actual_size;
const std::shared_ptr<FileInfo> file_info = GetFile(filename);
if (file_info) {
actual_size = file_info->size;
} else {
std::string abs_path = backup_dir + "/" + filename;
auto e = abs_path_to_size.find(abs_path);
if (e == abs_path_to_size.end()) {
return IOStatus::Corruption(
"Pathname in meta file not found on disk: " + abs_path);
}
actual_size = e->second;
}
if (schema_major_version >= 2) {
if (components.size() % 2 != 1) {
return IOStatus::Corruption(
@ -2944,8 +3085,10 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
}
}
std::optional<uint64_t> expected_size{};
std::string checksum_hex;
Temperature temp = Temperature::kUnknown;
bool excluded = false;
for (unsigned i = 1; i < components.size(); i += 2) {
const std::string& field_name = components[i];
const std::string& field_data = components[i + 1];
@ -2959,14 +3102,7 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
}
checksum_hex = ChecksumInt32ToHex(checksum_value);
} else if (field_name == kFileSizeFieldName) {
uint64_t ex_size =
std::strtoull(field_data.c_str(), nullptr, /*base*/ 10);
if (ex_size != actual_size) {
return IOStatus::Corruption(
"For file " + filename + " expected size " +
std::to_string(ex_size) + " but found size" +
std::to_string(actual_size));
}
expected_size = std::strtoull(field_data.c_str(), nullptr, /*base*/ 10);
} else if (field_name == kTemperatureFieldName) {
auto iter = temperature_string_map.find(field_data);
if (iter != temperature_string_map.end()) {
@ -2977,6 +3113,15 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
// be safe.
temp = Temperature::kUnknown;
}
} else if (field_name == kExcludedFieldName) {
if (field_data == "true") {
excluded = true;
} else if (field_data == "false") {
excluded = false;
} else {
return IOStatus::NotSupported("Unrecognized value \"" + field_data +
"\" for field " + field_name);
}
} else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
return IOStatus::NotSupported("Unrecognized non-ignorable file field " +
field_name + " (from future version?)");
@ -2989,8 +3134,29 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
}
}
files.emplace_back(new FileInfo(filename, actual_size, checksum_hex,
/*id*/ "", /*sid*/ "", temp));
if (excluded) {
excluded_files_.emplace_back(filename);
} else {
// Verify file exists, with expected size
std::string abs_path = backup_dir + "/" + filename;
auto e = abs_path_to_size.find(abs_path);
if (e == abs_path_to_size.end()) {
return IOStatus::Corruption(
"Pathname in meta file not found on disk: " + abs_path);
}
uint64_t actual_size = e->second;
if (expected_size.has_value() && *expected_size != actual_size) {
return IOStatus::Corruption("For file " + filename + " expected size " +
std::to_string(*expected_size) +
" but found size" +
std::to_string(actual_size));
}
// NOTE: FileInfo will be coalesced for sharing later (AddFile below)
files.emplace_back(
std::make_shared<FileInfo>(filename, actual_size, checksum_hex,
/*id*/ "", /*sid*/ "", temp));
}
}
if (footer_present) {
@ -3046,7 +3212,7 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
const std::vector<std::string> minor_version_strings{
"", // invalid major version 0
"", // implicit major version 1
"2.0",
"2.1",
};
IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
@ -3124,6 +3290,11 @@ IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
buf << "\n";
}
for (const auto& file : excluded_files_) {
assert(schema_version >= 2);
buf << file.relative_file << " " << kExcludedFieldName << " true\n";
}
if (schema_test_options && !schema_test_options->footer_fields.empty()) {
buf << kFooterMarker << "\n";
for (auto& e : schema_test_options->footer_fields) {

View File

@ -16,9 +16,11 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <limits>
#include <memory>
#include <random>
#include <stdexcept>
#include <string>
#include <utility>
@ -754,7 +756,7 @@ class BackupEngineTest : public testing::Test {
void CloseBackupEngine() { backup_engine_.reset(nullptr); }
// cross-cutting test of GetBackupInfo
void AssertBackupInfoConsistency() {
void AssertBackupInfoConsistency(bool allow_excluded = false) {
std::vector<BackupInfo> backup_info;
backup_engine_->GetBackupInfo(&backup_info, /*with file details*/ true);
std::map<std::string, uint64_t> file_sizes;
@ -774,6 +776,9 @@ class BackupEngineTest : public testing::Test {
sum_for_backup += file.size;
}
ASSERT_EQ(backup.size, sum_for_backup);
if (!allow_excluded) {
ASSERT_EQ(backup.excluded_files.size(), 0);
}
}
std::vector<BackupID> corrupt_backup_ids;
@ -3094,10 +3099,25 @@ TEST_F(BackupEngineTest, OpenBackupAsReadOnlyDB) {
TEST_F(BackupEngineTest, ProgressCallbackDuringBackup) {
DestroyDBWithoutCheck(dbname_, options_);
// Too big for this small DB
engine_options_->callback_trigger_interval_size = 100000;
OpenDBAndBackupEngine(true);
FillDB(db_.get(), 0, 100);
// First test exception handling
// Easily small enough for this small DB
engine_options_->callback_trigger_interval_size = 1000;
OpenBackupEngine();
ASSERT_TRUE(
backup_engine_->CreateNewBackup(db_.get(), true, []() { throw 42; })
.IsAborted());
ASSERT_TRUE(backup_engine_
->CreateNewBackup(db_.get(), true,
[]() { throw std::out_of_range("blah"); })
.IsAborted());
// Too big for this small DB
engine_options_->callback_trigger_interval_size = 100000;
OpenBackupEngine();
bool is_callback_invoked = false;
ASSERT_OK(backup_engine_->CreateNewBackup(
db_.get(), true,
@ -4180,7 +4200,7 @@ TEST_F(BackupEngineTest, FileTemperatures) {
&info, /*include_file_details*/ true));
ASSERT_GT(info.file_details.size(), 2);
for (auto& e : info.file_details) {
ASSERT_EQ(expected_temps[e.file_number], e.temperature);
EXPECT_EQ(expected_temps[e.file_number], e.temperature);
}
// Restore backup to another virtual (tiered) dir
@ -4202,6 +4222,175 @@ TEST_F(BackupEngineTest, FileTemperatures) {
}
}
TEST_F(BackupEngineTest, ExcludeFiles) {
// Required for excluding files
engine_options_->schema_version = 2;
// Need a sufficent set of file numbers
options_.level0_file_num_compaction_trigger = 100;
OpenDBAndBackupEngine(true, false, kShareWithChecksum);
// Need a sufficent set of file numbers
const int keys_iteration = 5000;
FillDB(db_.get(), 0, keys_iteration / 3);
FillDB(db_.get(), keys_iteration / 3, keys_iteration * 2 / 3);
FillDB(db_.get(), keys_iteration * 2 / 3, keys_iteration);
CloseAndReopenDB();
BackupEngine* alt_backup_engine;
BackupEngineOptions alt_engine_options{*engine_options_};
// Use an alternate Env to test that support
std::string backup_alt_chroot = test::PerThreadDBPath("db_alt_backups");
EXPECT_OK(Env::Default()->CreateDirIfMissing(backup_alt_chroot));
alt_engine_options.backup_dir = "/altbk";
std::shared_ptr<FileSystem> alt_fs{
NewChrootFileSystem(FileSystem::Default(), backup_alt_chroot)};
std::unique_ptr<Env> alt_env{new CompositeEnvWrapper(Env::Default(), alt_fs)};
alt_engine_options.backup_env = alt_env.get();
ASSERT_OK(BackupEngine::Open(test_db_env_.get(), alt_engine_options,
&alt_backup_engine));
// Ensure each backup is same set of files
db_.reset();
DB* db = nullptr;
ASSERT_OK(DB::OpenForReadOnly(options_, dbname_, &db));
// A callback that throws should cleanly fail the backup creation.
// Do this early to ensure later operations still work.
CreateBackupOptions cbo;
cbo.exclude_files_callback = [](MaybeExcludeBackupFile* /*files_begin*/,
MaybeExcludeBackupFile* /*files_end*/) {
throw 42;
};
ASSERT_TRUE(backup_engine_->CreateNewBackup(cbo, db).IsAborted());
cbo.exclude_files_callback = [](MaybeExcludeBackupFile* /*files_begin*/,
MaybeExcludeBackupFile* /*files_end*/) {
throw std::out_of_range("blah");
};
ASSERT_TRUE(backup_engine_->CreateNewBackup(cbo, db).IsAborted());
// Include files only in given bucket, based on modulus and remainder
constexpr int modulus = 4;
int remainder = 0;
cbo.exclude_files_callback = [&remainder](MaybeExcludeBackupFile* files_begin,
MaybeExcludeBackupFile* files_end) {
for (auto* f = files_begin; f != files_end; ++f) {
std::string s = StringSplit(f->info.relative_file, '/').back();
s = s.substr(0, s.find("_"));
int64_t num = std::strtoll(s.c_str(), nullptr, /*base*/ 10);
// Exclude if not a match
f->exclude_decision = (num % modulus) != remainder;
}
};
BackupID first_id{};
BackupID last_alt_id{};
remainder = 0;
ASSERT_OK(backup_engine_->CreateNewBackup(cbo, db, &first_id));
AssertBackupInfoConsistency(/*allow excluded*/ true);
remainder = 1;
ASSERT_OK(alt_backup_engine->CreateNewBackup(cbo, db));
AssertBackupInfoConsistency(/*allow excluded*/ true);
remainder = 2;
ASSERT_OK(backup_engine_->CreateNewBackup(cbo, db));
AssertBackupInfoConsistency(/*allow excluded*/ true);
remainder = 3;
ASSERT_OK(alt_backup_engine->CreateNewBackup(cbo, db, &last_alt_id));
AssertBackupInfoConsistency(/*allow excluded*/ true);
// Close DB
ASSERT_OK(db->Close());
delete db;
db = nullptr;
for (auto be_pair :
{std::make_pair(backup_engine_.get(), alt_backup_engine),
std::make_pair(alt_backup_engine, backup_engine_.get())}) {
DestroyDB(dbname_, options_);
RestoreOptions ro;
// Fails without alternate dir
ASSERT_TRUE(be_pair.first->RestoreDBFromLatestBackup(dbname_, dbname_, ro)
.IsInvalidArgument());
DestroyDB(dbname_, options_);
// Works with alternate dir
ro.alternate_dirs.push_front(be_pair.second);
ASSERT_OK(be_pair.first->RestoreDBFromLatestBackup(dbname_, dbname_, ro));
// Check DB contents
db = OpenDB();
AssertExists(db, 0, keys_iteration);
delete db;
}
// Should still work after close and re-open
CloseBackupEngine();
OpenBackupEngine();
for (auto be_pair :
{std::make_pair(backup_engine_.get(), alt_backup_engine),
std::make_pair(alt_backup_engine, backup_engine_.get())}) {
DestroyDB(dbname_, options_);
RestoreOptions ro;
ro.alternate_dirs.push_front(be_pair.second);
ASSERT_OK(be_pair.first->RestoreDBFromLatestBackup(dbname_, dbname_, ro));
}
// Deletion semantics are tricky when within a single backup dir one backup
// includes a file and the other backup excluded the file. The excluded one
// does not have a persistent record of metadata like file checksum, etc.
// Although it would be possible to amend the backup with the excluded file,
// that is not currently supported (unless you open the backup as read-only
// DB and take another backup of it). The "excluded" reference to the file
// is like a weak reference: it doesn't prevent the file from being deleted
// if all the backups with "included" references to it are deleted.
CloseBackupEngine();
OpenBackupEngine();
AssertBackupInfoConsistency(/*allow excluded*/ true);
ASSERT_OK(backup_engine_->DeleteBackup(first_id));
ASSERT_OK(alt_backup_engine->DeleteBackup(last_alt_id));
// Includes check for any leaked backup files
AssertBackupInfoConsistency(/*allow excluded*/ true);
// Excluded file(s) deleted, unable to restore
for (auto be_pair :
{std::make_pair(backup_engine_.get(), alt_backup_engine),
std::make_pair(alt_backup_engine, backup_engine_.get())}) {
RestoreOptions ro;
ro.alternate_dirs.push_front(be_pair.second);
ASSERT_TRUE(be_pair.first->RestoreDBFromLatestBackup(dbname_, dbname_, ro)
.IsInvalidArgument());
}
// Close & Re-open (no crash, etc.)
CloseBackupEngine();
OpenBackupEngine();
AssertBackupInfoConsistency(/*allow excluded*/ true);
// Excluded file(s) deleted, unable to restore
for (auto be_pair :
{std::make_pair(backup_engine_.get(), alt_backup_engine),
std::make_pair(alt_backup_engine, backup_engine_.get())}) {
RestoreOptions ro;
ro.alternate_dirs.push_front(be_pair.second);
ASSERT_TRUE(be_pair.first->RestoreDBFromLatestBackup(dbname_, dbname_, ro)
.IsInvalidArgument());
}
// Ensure files are not leaked after removing everything.
ASSERT_OK(backup_engine_->DeleteBackup(first_id + 1));
ASSERT_OK(alt_backup_engine->DeleteBackup(last_alt_id - 1));
// Includes check for leaked backups files
AssertBackupInfoConsistency(/*allow excluded*/ false);
}
} // namespace
} // namespace ROCKSDB_NAMESPACE