BackupEngine verifies table file checksums on creating new backups (#7015)

Summary:
When table file checksums are enabled and stored in the DB manifest by using the RocksDB default crc32c checksum function, BackupEngine will calculate the crc32c checksum of the file to be copied and compare the calculated result with the one stored in the DB manifest before copying the file to the backup directory.

After copying to the backup directory, BackupEngine will verify the checksum of the copied file with the one calculated before copying. This helps detect some rare corruption events such as bit-flips during the copying process.

No verification with checksums in DB manifest will be performed if the table file checksum function is not the RocksDB default crc32c checksum function.

In addition, If `share_table_files` and `share_files_with_checksum` are true, BackupEngine will compare the checksums computed before and after copying of the table files.

Corresponding tests are added.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/7015

Test Plan: Passed make check

Reviewed By: pdillinger

Differential Revision: D22165732

Pulled By: gg814

fbshipit-source-id: ee0e8cc397c455eba64545c29380b9d9853588ec
This commit is contained in:
Zitan Chen 2020-07-02 18:13:31 -07:00 committed by Facebook GitHub Bot
parent a680a7ea37
commit 373d5ac485
11 changed files with 389 additions and 31 deletions

View File

@ -6,6 +6,9 @@
### Behavior Changes
* Best-efforts recovery ignores CURRENT file completely. If CURRENT file is missing during recovery, best-efforts recovery still proceeds with MANIFEST file(s).
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
* When `file_checksum_gen_factory` is set to `GetFileChecksumGenCrc32cFactory()`, BackupEngine will compare the crc32c checksums of table files computed when creating a backup to the expected checksums stored in the DB manifest, and will fail `CreateNewBackup()` on mismatch (corruption). If the `file_checksum_gen_factory` is not set or set to any other customized factory, there is no checksum verification to detect if SST files in a DB are corrupt when read, copied, and independently checksummed by BackupEngine.
### Bug fixes
* Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too.
* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement.
* Disable file deletion after MANIFEST write/sync failure until db re-open or Resume() so that subsequent re-open will not see MANIFEST referencing deleted SSTs.

View File

@ -3536,6 +3536,11 @@ void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
versions_->GetLiveFilesMetaData(metadata);
}
Status DBImpl::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
InstrumentedMutexLock l(&mutex_);
return versions_->GetLiveFilesChecksumInfo(checksum_list);
}
void DBImpl::GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* cf_meta) {
assert(column_family);

View File

@ -393,6 +393,9 @@ class DBImpl : public DB {
virtual void GetLiveFilesMetaData(
std::vector<LiveFileMetaData>* metadata) override;
virtual Status GetLiveFilesChecksumInfo(
FileChecksumList* checksum_list) override;
// Obtains the meta data of the specified column family of the DB.
// Status::NotFound() will be returned if the current DB does not have
// any column family match the specified name.

View File

@ -3010,6 +3010,11 @@ class ModelDB : public DB {
return Status::OK();
}
Status GetLiveFilesChecksumInfo(
FileChecksumList* /*checksum_list*/) override {
return Status::OK();
}
Status GetSortedWalFiles(VectorLogPtr& /*files*/) override {
return Status::OK();
}

View File

@ -1354,6 +1354,9 @@ class DB {
virtual void GetLiveFilesMetaData(
std::vector<LiveFileMetaData>* /*metadata*/) {}
// Return a list of all table checksum info
virtual Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) = 0;
// Obtains the meta data of the specified column family of the DB.
virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* /*column_family*/,
ColumnFamilyMetaData* /*metadata*/) {}

View File

@ -24,6 +24,11 @@
namespace ROCKSDB_NAMESPACE {
// The default DB file checksum function name.
constexpr char kDbFileChecksumFuncName[] = "FileChecksumCrc32c";
// The default BackupEngine file checksum function name.
constexpr char kBackupFileChecksumFuncName[] = "crc32c";
// BackupTableNameOption describes possible naming schemes for backup
// table file names when the table files are stored in the shared_checksum
// directory (i.e., both share_table_files and share_files_with_checksum

View File

@ -347,6 +347,11 @@ class StackableDB : public DB {
db_->GetLiveFilesMetaData(metadata);
}
virtual Status GetLiveFilesChecksumInfo(
FileChecksumList* checksum_list) override {
return db_->GetLiveFilesChecksumInfo(checksum_list);
}
virtual void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
ColumnFamilyMetaData* cf_meta) override {
db_->GetColumnFamilyMetaData(column_family, cf_meta);

View File

@ -335,6 +335,14 @@ class BackupEngineImpl : public BackupEngine {
inline bool IsSstFile(const std::string& fname) const {
return fname.length() > 4 && fname.rfind(".sst") == fname.length() - 4;
}
inline std::string ChecksumInt32ToStr(const uint32_t& checksum_int) {
std::string checksum_str;
PutFixed32(&checksum_str, EndianSwapValue(checksum_int));
return checksum_str;
}
inline uint32_t ChecksumStrToInt32(const std::string& checksum_str) {
return EndianSwapValue(DecodeFixed32(checksum_str.c_str()));
}
// If size_limit == 0, there is no size limit, copy everything.
//
@ -382,6 +390,9 @@ class BackupEngineImpl : public BackupEngine {
uint64_t size_limit;
std::promise<CopyOrCreateResult> result;
std::function<void()> progress_callback;
bool verify_checksum_after_work;
std::string src_checksum_func_name;
std::string src_checksum_str;
CopyOrCreateWorkItem()
: src_path(""),
@ -392,7 +403,10 @@ class BackupEngineImpl : public BackupEngine {
src_env_options(),
sync(false),
rate_limiter(nullptr),
size_limit(0) {}
size_limit(0),
verify_checksum_after_work(false),
src_checksum_func_name(kUnknownFileChecksumFuncName),
src_checksum_str(kUnknownFileChecksum) {}
CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
@ -413,14 +427,21 @@ class BackupEngineImpl : public BackupEngine {
size_limit = o.size_limit;
result = std::move(o.result);
progress_callback = std::move(o.progress_callback);
verify_checksum_after_work = o.verify_checksum_after_work;
src_checksum_func_name = o.src_checksum_func_name;
src_checksum_str = o.src_checksum_str;
return *this;
}
CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
std::string _contents, Env* _src_env, Env* _dst_env,
EnvOptions _src_env_options, bool _sync,
RateLimiter* _rate_limiter, uint64_t _size_limit,
std::function<void()> _progress_callback = []() {})
CopyOrCreateWorkItem(
std::string _src_path, std::string _dst_path, std::string _contents,
Env* _src_env, Env* _dst_env, EnvOptions _src_env_options, bool _sync,
RateLimiter* _rate_limiter, uint64_t _size_limit,
std::function<void()> _progress_callback = []() {},
const bool& _verify_checksum_after_work = false,
const std::string& _src_checksum_func_name =
kUnknownFileChecksumFuncName,
const std::string& _src_checksum_str = kUnknownFileChecksum)
: src_path(std::move(_src_path)),
dst_path(std::move(_dst_path)),
contents(std::move(_contents)),
@ -430,7 +451,10 @@ class BackupEngineImpl : public BackupEngine {
sync(_sync),
rate_limiter(_rate_limiter),
size_limit(_size_limit),
progress_callback(_progress_callback) {}
progress_callback(_progress_callback),
verify_checksum_after_work(_verify_checksum_after_work),
src_checksum_func_name(_src_checksum_func_name),
src_checksum_str(_src_checksum_str) {}
};
struct BackupAfterCopyOrCreateWorkItem {
@ -529,7 +553,9 @@ class BackupEngineImpl : public BackupEngine {
uint64_t size_bytes, uint64_t size_limit = 0,
bool shared_checksum = false,
std::function<void()> progress_callback = []() {},
const std::string& contents = std::string());
const std::string& contents = std::string(),
const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName,
const std::string& src_checksum_str = kUnknownFileChecksum);
// backup state data
BackupID latest_backup_id_;
@ -812,6 +838,37 @@ Status BackupEngineImpl::Initialize() {
work_item.sync, work_item.rate_limiter, &result.size,
&result.checksum_value, work_item.size_limit,
work_item.progress_callback, &result.db_id, &result.db_session_id);
if (result.status.ok() && work_item.verify_checksum_after_work) {
// unknown checksum function name implies no db table file checksum in
// db manifest; work_item.verify_checksum_after_work being true means
// backup engine has calculated its crc32c checksum for the table
// file; therefore, we are able to compare the checksums.
if (work_item.src_checksum_func_name ==
kUnknownFileChecksumFuncName ||
work_item.src_checksum_func_name == kDbFileChecksumFuncName) {
uint32_t src_checksum_int =
ChecksumStrToInt32(work_item.src_checksum_str);
if (src_checksum_int != result.checksum_value) {
std::string checksum_info("Expected checksum is " +
ToString(src_checksum_int) +
" while computed checksum is " +
ToString(result.checksum_value));
result.status =
Status::Corruption("Checksum mismatch after copying to " +
work_item.dst_path + ": " + checksum_info);
}
} else {
std::string checksum_function_info(
"Existing checksum function is " +
work_item.src_checksum_func_name +
" while provided checksum function is " +
kBackupFileChecksumFuncName);
ROCKS_LOG_INFO(
options_.info_log,
"Unable to verify checksum after copying to %s: %s\n",
work_item.dst_path.c_str(), checksum_function_info.c_str());
}
}
work_item.result.set_value(std::move(result));
}
});
@ -893,6 +950,15 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
CheckpointImpl checkpoint(db);
uint64_t sequence_number = 0;
DBOptions db_options = db->GetDBOptions();
FileChecksumGenFactory* db_checksum_factory =
db_options.file_checksum_gen_factory.get();
const std::string kFileChecksumGenFactoryName =
"FileChecksumGenCrc32cFactory";
bool compare_checksum =
db_checksum_factory != nullptr &&
db_checksum_factory->Name() == kFileChecksumGenFactoryName
? true
: false;
EnvOptions src_raw_env_options(db_options);
s = checkpoint.CreateCustomCheckpoint(
db_options,
@ -903,7 +969,9 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
return Status::NotSupported();
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType type) {
uint64_t size_limit_bytes, FileType type,
const std::string& checksum_func_name,
const std::string& checksum_val) {
if (type == kLogFile && !options_.backup_log_files) {
return Status::OK();
}
@ -941,7 +1009,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
fname, src_env_options, rate_limiter, size_bytes,
size_limit_bytes,
options_.share_files_with_checksum && type == kTableFile,
options.progress_callback);
options.progress_callback, "" /* contents */,
checksum_func_name, checksum_val);
}
return st;
} /* copy_file_cb */,
@ -954,7 +1023,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
0 /* size_limit */, false /* shared_checksum */,
options.progress_callback, contents);
} /* create_file_cb */,
&sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64);
&sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64,
compare_checksum);
if (s.ok()) {
new_backup->SetSequenceNumber(sequence_number);
}
@ -1431,6 +1501,9 @@ Status BackupEngineImpl::CopyOrCreateFile(
data = contents;
}
size_limit -= data.size();
TEST_SYNC_POINT_CALLBACK(
"BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup",
IsSstFile(src) ? &data : nullptr);
if (!s.ok()) {
return s;
@ -1488,7 +1561,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
const std::string& fname, const EnvOptions& src_env_options,
RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
bool shared_checksum, std::function<void()> progress_callback,
const std::string& contents) {
const std::string& contents, const std::string& src_checksum_func_name,
const std::string& src_checksum_str) {
assert(!fname.empty() && fname[0] == '/');
assert(contents.empty() != src_dir.empty());
@ -1498,15 +1572,50 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
uint32_t checksum_value = 0;
std::string db_id;
std::string db_session_id;
// whether the checksum for a table file has been computed
bool has_checksum = false;
// Step 1: Prepare the relative path to destination
if (shared && shared_checksum) {
// Prepare checksum to add to file name
// Whenever a default checksum function name is passed in, we will verify it
// before copying. Note that only table files may have a known checksum name
// passed in.
//
// If no default checksum function name is passed in, we will calculate the
// checksum *before* copying in two cases (we always calcuate checksums when
// copying or creating for any file types):
// a) share_files_with_checksum is true and file type is table;
// b) share_table_files is true and the file exists already.
if (kDbFileChecksumFuncName == src_checksum_func_name) {
if (src_checksum_str == kUnknownFileChecksum) {
return Status::Aborted("Unkown checksum value for " + fname);
}
s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit,
&checksum_value);
if (!s.ok()) {
return s;
}
// Convert src_checksum_str to uint32_t and compare
uint32_t src_checksum_int = ChecksumStrToInt32(src_checksum_str);
if (src_checksum_int != checksum_value) {
std::string checksum_info(
"Expected checksum is " + ToString(src_checksum_int) +
" while computed checksum is " + ToString(checksum_value));
return Status::Corruption("Checksum mismatch before copying from " +
fname + ": " + checksum_info);
}
has_checksum = true;
}
// Step 1: Prepare the relative path to destination
if (shared && shared_checksum) {
// add checksum and file length to the file name
if (!has_checksum) {
s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
size_limit, &checksum_value);
if (!s.ok()) {
return s;
}
has_checksum = true;
}
if (GetTableNamingOption() == kChecksumAndDbSessionId) {
// Prepare db_session_id to add to the file name
// Ignore the returned status
@ -1596,10 +1705,13 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
// the file is present and referenced by a backup
ROCKS_LOG_INFO(options_.info_log,
"%s already present, calculate checksum", fname.c_str());
s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
size_limit, &checksum_value);
if (!s.ok()) {
return s;
if (!has_checksum) {
s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
size_limit, &checksum_value);
if (!s.ok()) {
return s;
}
has_checksum = true;
}
// try to get the db identities as they are also members of
// the class CopyOrCreateResult
@ -1622,7 +1734,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
CopyOrCreateWorkItem copy_or_create_work_item(
src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
size_limit, progress_callback);
size_limit, progress_callback, has_checksum, src_checksum_func_name,
ChecksumInt32ToStr(checksum_value));
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), shared, need_to_copy,
backup_env_, temp_dest_path, final_dest_path, dst_relative);

View File

@ -678,6 +678,44 @@ class BackupableDBTest : public testing::Test {
}
}
Status CorruptRandomTableFileInDB() {
Random rnd(6);
std::vector<FileAttributes> children;
test_db_env_->GetChildrenFileAttributes(dbname_, &children);
if (children.size() <= 2) { // . and ..
return Status::NotFound("");
}
std::string fname;
uint64_t fsize = 0;
while (true) {
int i = rnd.Next() % children.size();
fname = children[i].name;
fsize = children[i].size_bytes;
// find an sst file
if (fsize > 0 && fname.length() > 4 &&
fname.rfind(".sst") == fname.length() - 4) {
fname = dbname_ + "/" + fname;
break;
}
}
std::string file_contents;
Status s = ReadFileToString(test_db_env_.get(), fname, &file_contents);
if (!s.ok()) {
return s;
}
s = test_db_env_->DeleteFile(fname);
if (!s.ok()) {
return s;
}
for (uint64_t i = 0; i < fsize; ++i) {
std::string tmp;
test::RandomString(&rnd, 1, &tmp);
file_contents[rnd.Next() % file_contents.size()] = tmp[0];
}
return WriteStringToFile(test_db_env_.get(), file_contents, fname);
}
// files
std::string dbname_;
std::string backupdir_;
@ -1137,6 +1175,155 @@ TEST_F(BackupableDBTest, CorruptFileMaintainSize) {
CloseDBAndBackupEngine();
}
// Test if BackupEngine will fail to create new backup if some table has been
// corrupted and the table file checksum is stored in the DB manifest
TEST_F(BackupableDBTest, TableFileCorruptedBeforeBackup) {
const int keys_iteration = 50000;
OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
kNoShare);
FillDB(db_.get(), 0, keys_iteration);
// corrupt a random table file in the DB directory
ASSERT_OK(CorruptRandomTableFileInDB());
// file_checksum_gen_factory is null, and thus table checksum is not
// verified for creating a new backup; no correction is detected
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
CloseDBAndBackupEngine();
// delete old files in db
ASSERT_OK(DestroyDB(dbname_, options_));
// Enable table file checksum in DB manifest
options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
kNoShare);
FillDB(db_.get(), 0, keys_iteration);
// corrupt a random table file in the DB directory
ASSERT_OK(CorruptRandomTableFileInDB());
// table file checksum is enabled so we should be able to detect any
// corruption
ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get()));
CloseDBAndBackupEngine();
}
// Test if BackupEngine will fail to create new backup if some table has been
// corrupted and the table file checksum is stored in the DB manifest for the
// case when backup table files will be stored in a shared directory
TEST_P(BackupableDBTestWithParam, TableFileCorruptedBeforeBackup) {
const int keys_iteration = 50000;
OpenDBAndBackupEngine(true /* destroy_old_data */);
FillDB(db_.get(), 0, keys_iteration);
// corrupt a random table file in the DB directory
ASSERT_OK(CorruptRandomTableFileInDB());
// cannot detect corruption since DB manifest has no table checksums
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
CloseDBAndBackupEngine();
// delete old files in db
ASSERT_OK(DestroyDB(dbname_, options_));
// Enable table checksums in DB manifest
options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
OpenDBAndBackupEngine(true /* destroy_old_data */);
FillDB(db_.get(), 0, keys_iteration);
// corrupt a random table file in the DB directory
ASSERT_OK(CorruptRandomTableFileInDB());
// corruption is detected
ASSERT_NOK(backup_engine_->CreateNewBackup(db_.get()));
CloseDBAndBackupEngine();
}
TEST_F(BackupableDBTest, TableFileCorruptedDuringBackup) {
const int keys_iteration = 50000;
std::vector<std::shared_ptr<FileChecksumGenFactory>> fac{
nullptr, GetFileChecksumGenCrc32cFactory()};
for (auto& f : fac) {
options_.file_checksum_gen_factory = f;
if (f == nullptr) {
// When share_files_with_checksum is on, we calculate checksums of table
// files before and after copying. So we can test whether a corruption has
// happened during the file is copied to backup directory.
OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
kShareWithChecksum);
} else {
// Default DB table file checksum is on, we calculate checksums of table
// files before copying to verify it with the one stored in DB manifest
// and also calculate checksum after copying. So we can test whether a
// corruption has happened during the file is copied to backup directory
// even if we do not place table files in shared_checksum directory.
OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
kNoShare);
}
FillDB(db_.get(), 0, keys_iteration);
bool corrupted = false;
// corrupt files when copying to the backup directory
SyncPoint::GetInstance()->SetCallBack(
"BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup",
[&](void* data) {
if (data != nullptr) {
Slice* d = reinterpret_cast<Slice*>(data);
if (!d->empty()) {
d->remove_suffix(1);
corrupted = true;
}
}
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = backup_engine_->CreateNewBackup(db_.get());
if (corrupted) {
ASSERT_NOK(s);
} else {
// should not in this path in normal cases
ASSERT_OK(s);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
CloseDBAndBackupEngine();
// delete old files in db
ASSERT_OK(DestroyDB(dbname_, options_));
}
}
TEST_P(BackupableDBTestWithParam,
TableFileCorruptedDuringBackupWithDefaultDbChecksum) {
const int keys_iteration = 100000;
options_.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
OpenDBAndBackupEngine(true /* destroy_old_data */);
FillDB(db_.get(), 0, keys_iteration);
bool corrupted = false;
// corrupt files when copying to the backup directory
SyncPoint::GetInstance()->SetCallBack(
"BackupEngineImpl::CopyOrCreateFile:CorruptionDuringBackup",
[&](void* data) {
if (data != nullptr) {
Slice* d = reinterpret_cast<Slice*>(data);
if (!d->empty()) {
d->remove_suffix(1);
corrupted = true;
}
}
});
SyncPoint::GetInstance()->EnableProcessing();
Status s = backup_engine_->CreateNewBackup(db_.get());
if (corrupted) {
ASSERT_NOK(s);
} else {
// should not in this path in normal cases
ASSERT_OK(s);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
CloseDBAndBackupEngine();
// delete old files in db
ASSERT_OK(DestroyDB(dbname_, options_));
}
TEST_F(BackupableDBTest, InterruptCreationTest) {
// Interrupt backup creation by failing new writes and failing cleanup of the
// partial state. Then verify a subsequent backup can still succeed.

View File

@ -118,7 +118,9 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
IOOptions(), nullptr);
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType) {
uint64_t size_limit_bytes, FileType,
const std::string& /* checksum_func_name */,
const std::string& /* checksum_val */) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
return CopyFile(db_->GetFileSystem(), src_dirname + fname,
full_private_path + fname, size_limit_bytes,
@ -168,14 +170,16 @@ Status CheckpointImpl::CreateCustomCheckpoint(
std::function<Status(const std::string& src_dirname,
const std::string& src_fname, FileType type)>
link_file_cb,
std::function<Status(const std::string& src_dirname,
const std::string& src_fname,
uint64_t size_limit_bytes, FileType type)>
std::function<Status(
const std::string& src_dirname, const std::string& src_fname,
uint64_t size_limit_bytes, FileType type,
const std::string& checksum_func_name, const std::string& checksum_val)>
copy_file_cb,
std::function<Status(const std::string& fname, const std::string& contents,
FileType type)>
create_file_cb,
uint64_t* sequence_number, uint64_t log_size_for_flush) {
uint64_t* sequence_number, uint64_t log_size_for_flush,
const bool& get_live_table_checksum) {
Status s;
std::vector<std::string> live_files;
uint64_t manifest_file_size = 0;
@ -255,6 +259,13 @@ Status CheckpointImpl::CreateCustomCheckpoint(
size_t wal_size = live_wal_files.size();
// get table file checksums if get_live_table_checksum is true
std::unique_ptr<FileChecksumList> checksum_list(NewFileChecksumList());
Status checksum_status;
if (get_live_table_checksum) {
checksum_status = db_->GetLiveFilesChecksumInfo(checksum_list.get());
}
// copy/hard link live_files
std::string manifest_fname, current_fname;
for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
@ -292,9 +303,23 @@ Status CheckpointImpl::CreateCustomCheckpoint(
}
}
if ((type != kTableFile) || (!same_fs)) {
std::string checksum_name = kUnknownFileChecksumFuncName;
std::string checksum_value = kUnknownFileChecksum;
// we ignore the checksums either they are not required or we failed to
// obtain the checksum lsit for old table files that have no file
// checksums
if (type == kTableFile && get_live_table_checksum &&
checksum_status.ok()) {
// find checksum info for table files
s = checksum_list->SearchOneFileChecksum(number, &checksum_value,
&checksum_name);
if (!s.ok()) {
return Status::NotFound("Can't find checksum for " + src_fname);
}
}
s = copy_file_cb(db_->GetName(), src_fname,
(type == kDescriptorFile) ? manifest_file_size : 0,
type);
(type == kDescriptorFile) ? manifest_file_size : 0, type,
checksum_name, checksum_value);
}
}
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
@ -313,7 +338,8 @@ Status CheckpointImpl::CreateCustomCheckpoint(
live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) {
s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
live_wal_files[i]->SizeFileBytes(), kLogFile);
live_wal_files[i]->SizeFileBytes(), kLogFile,
kUnknownFileChecksumFuncName, kUnknownFileChecksum);
break;
}
if (same_fs) {
@ -327,7 +353,8 @@ Status CheckpointImpl::CreateCustomCheckpoint(
}
if (!same_fs) {
s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0,
kLogFile);
kLogFile, kUnknownFileChecksumFuncName,
kUnknownFileChecksum);
}
}
}

View File

@ -51,12 +51,14 @@ class CheckpointImpl : public Checkpoint {
link_file_cb,
std::function<Status(const std::string& src_dirname,
const std::string& fname, uint64_t size_limit_bytes,
FileType type)>
FileType type, const std::string& checksum_func_name,
const std::string& checksum_val)>
copy_file_cb,
std::function<Status(const std::string& fname,
const std::string& contents, FileType type)>
create_file_cb,
uint64_t* sequence_number, uint64_t log_size_for_flush);
uint64_t* sequence_number, uint64_t log_size_for_flush,
const bool& get_live_table_checksum = false);
private:
void CleanStagingDirectory(const std::string& path, Logger* info_log);