mirror of https://github.com/facebook/rocksdb.git
Fix orphaned files in SstFileManager (#13015)
Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/13015 `Close()`ing a database now releases tracked files in `SstFileManager`. Previously this space would be leaked until the database was later reopened. Reviewed By: jowlyzhang Differential Revision: D62590773 fbshipit-source-id: 5461bd253d974ac4967ad52fee92e2650f8a9a28
This commit is contained in:
parent
f411c8bc97
commit
0611eb5b9d
|
@ -530,6 +530,11 @@ Status DBImpl::MaybeReleaseTimestampedSnapshotsAndCheck() {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
void DBImpl::UntrackDataFiles() {
|
||||
TrackOrUntrackFiles(/*existing_data_files=*/{},
|
||||
/*track=*/false);
|
||||
}
|
||||
|
||||
Status DBImpl::CloseHelper() {
|
||||
// Guarantee that there is no background error recovery in progress before
|
||||
// continuing with the shutdown
|
||||
|
@ -669,6 +674,13 @@ Status DBImpl::CloseHelper() {
|
|||
delete txn_entry.second;
|
||||
}
|
||||
|
||||
// Return an unowned SstFileManager to a consistent state
|
||||
if (immutable_db_options_.sst_file_manager && !own_sfm_) {
|
||||
mutex_.Unlock();
|
||||
UntrackDataFiles();
|
||||
mutex_.Lock();
|
||||
}
|
||||
|
||||
// versions need to be destroyed before table_cache since it can hold
|
||||
// references to table_cache.
|
||||
{
|
||||
|
@ -6747,6 +6759,62 @@ void DBImpl::RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds) {
|
|||
}
|
||||
}
|
||||
|
||||
void DBImpl::TrackOrUntrackFiles(
|
||||
const std::vector<std::string>& existing_data_files, bool track) {
|
||||
auto sfm = static_cast_with_check<SstFileManagerImpl>(
|
||||
immutable_db_options_.sst_file_manager.get());
|
||||
assert(sfm);
|
||||
std::vector<ColumnFamilyMetaData> metadata;
|
||||
GetAllColumnFamilyMetaData(&metadata);
|
||||
auto action = [&](const std::string& file_path,
|
||||
std::optional<uint64_t> size) {
|
||||
if (track) {
|
||||
if (size) {
|
||||
sfm->OnAddFile(file_path, *size).PermitUncheckedError();
|
||||
} else {
|
||||
sfm->OnAddFile(file_path).PermitUncheckedError();
|
||||
}
|
||||
} else {
|
||||
sfm->OnUntrackFile(file_path).PermitUncheckedError();
|
||||
}
|
||||
};
|
||||
|
||||
std::unordered_set<std::string> referenced_files;
|
||||
for (const auto& md : metadata) {
|
||||
for (const auto& lmd : md.levels) {
|
||||
for (const auto& fmd : lmd.files) {
|
||||
// We're assuming that each sst file name exists in at most one of
|
||||
// the paths.
|
||||
std::string file_path =
|
||||
fmd.directory + kFilePathSeparator + fmd.relative_filename;
|
||||
action(file_path, fmd.size);
|
||||
referenced_files.insert(file_path);
|
||||
}
|
||||
}
|
||||
for (const auto& bmd : md.blob_files) {
|
||||
std::string name = bmd.blob_file_name;
|
||||
// The BlobMetaData.blob_file_name may start with "/".
|
||||
if (!name.empty() && name[0] == kFilePathSeparator) {
|
||||
name = name.substr(1);
|
||||
}
|
||||
// We're assuming that each blob file name exists in at most one of
|
||||
// the paths.
|
||||
std::string file_path = bmd.blob_file_path + kFilePathSeparator + name;
|
||||
action(file_path, bmd.blob_file_size);
|
||||
referenced_files.insert(file_path);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& file_path : existing_data_files) {
|
||||
if (referenced_files.find(file_path) != referenced_files.end()) {
|
||||
continue;
|
||||
}
|
||||
// There shouldn't be any duplicated files. In case there is, SstFileManager
|
||||
// will take care of deduping it.
|
||||
action(file_path, /*size=*/std::nullopt);
|
||||
}
|
||||
}
|
||||
|
||||
void DBImpl::InstallSeqnoToTimeMappingInSV(
|
||||
std::vector<SuperVersionContext>* sv_contexts) {
|
||||
mutex_.AssertHeld();
|
||||
|
|
|
@ -1618,9 +1618,15 @@ class DBImpl : public DB {
|
|||
// vast majority of all files), since it already has the file size
|
||||
// on record, we don't need to query the file system. Otherwise, we query the
|
||||
// file system for the size of an unreferenced file.
|
||||
// REQUIRES: mutex unlocked
|
||||
void TrackExistingDataFiles(
|
||||
const std::vector<std::string>& existing_data_files);
|
||||
|
||||
// Untrack data files in sst manager. This is only called during DB::Close on
|
||||
// an unowned SstFileManager, to return it to a consistent state.
|
||||
// REQUIRES: mutex unlocked
|
||||
void UntrackDataFiles();
|
||||
|
||||
// SetDbSessionId() should be called in the constuctor DBImpl()
|
||||
// to ensure that db_session_id_ gets updated every time the DB is opened
|
||||
void SetDbSessionId();
|
||||
|
@ -2190,6 +2196,10 @@ class DBImpl : public DB {
|
|||
JobContext* job_context, LogBuffer* log_buffer,
|
||||
CompactionJobInfo* compaction_job_info);
|
||||
|
||||
// REQUIRES: mutex unlocked
|
||||
void TrackOrUntrackFiles(const std::vector<std::string>& existing_data_files,
|
||||
bool track);
|
||||
|
||||
ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
|
||||
|
||||
void MaybeScheduleFlushOrCompaction();
|
||||
|
|
|
@ -1988,46 +1988,7 @@ IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
|
|||
|
||||
void DBImpl::TrackExistingDataFiles(
|
||||
const std::vector<std::string>& existing_data_files) {
|
||||
auto sfm = static_cast<SstFileManagerImpl*>(
|
||||
immutable_db_options_.sst_file_manager.get());
|
||||
assert(sfm);
|
||||
std::vector<ColumnFamilyMetaData> metadata;
|
||||
GetAllColumnFamilyMetaData(&metadata);
|
||||
|
||||
std::unordered_set<std::string> referenced_files;
|
||||
for (const auto& md : metadata) {
|
||||
for (const auto& lmd : md.levels) {
|
||||
for (const auto& fmd : lmd.files) {
|
||||
// We're assuming that each sst file name exists in at most one of
|
||||
// the paths.
|
||||
std::string file_path =
|
||||
fmd.directory + kFilePathSeparator + fmd.relative_filename;
|
||||
sfm->OnAddFile(file_path, fmd.size).PermitUncheckedError();
|
||||
referenced_files.insert(file_path);
|
||||
}
|
||||
}
|
||||
for (const auto& bmd : md.blob_files) {
|
||||
std::string name = bmd.blob_file_name;
|
||||
// The BlobMetaData.blob_file_name may start with "/".
|
||||
if (!name.empty() && name[0] == kFilePathSeparator) {
|
||||
name = name.substr(1);
|
||||
}
|
||||
// We're assuming that each blob file name exists in at most one of
|
||||
// the paths.
|
||||
std::string file_path = bmd.blob_file_path + kFilePathSeparator + name;
|
||||
sfm->OnAddFile(file_path, bmd.blob_file_size).PermitUncheckedError();
|
||||
referenced_files.insert(file_path);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& file_path : existing_data_files) {
|
||||
if (referenced_files.find(file_path) != referenced_files.end()) {
|
||||
continue;
|
||||
}
|
||||
// There shouldn't be any duplicated files. In case there is, SstFileManager
|
||||
// will take care of deduping it.
|
||||
sfm->OnAddFile(file_path).PermitUncheckedError();
|
||||
}
|
||||
TrackOrUntrackFiles(existing_data_files, /*track=*/true);
|
||||
}
|
||||
|
||||
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
|
|
|
@ -383,12 +383,16 @@ TEST_F(DBSSTTest, DBWithSstFileManager) {
|
|||
ASSERT_EQ(files_moved, 0);
|
||||
|
||||
Close();
|
||||
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
|
||||
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
|
||||
Reopen(options);
|
||||
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
|
||||
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
|
||||
|
||||
// Verify that we track all the files again after the DB is closed and opened
|
||||
Close();
|
||||
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
|
||||
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
|
||||
sst_file_manager.reset(NewSstFileManager(env_));
|
||||
options.sst_file_manager = sst_file_manager;
|
||||
sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
|
||||
|
@ -439,6 +443,11 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
|
|||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
|
||||
|
||||
int64_t untracked_files = 0;
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SstFileManagerImpl::OnUntrackFile",
|
||||
[&](void* /*arg*/) { ++untracked_files; });
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
Options options = CurrentOptions();
|
||||
|
@ -485,6 +494,10 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
|
|||
}
|
||||
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
|
||||
Close();
|
||||
ASSERT_EQ(untracked_files, files_in_db.size());
|
||||
untracked_files = 0;
|
||||
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
|
||||
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
|
||||
|
||||
Reopen(options);
|
||||
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
|
||||
|
@ -492,6 +505,10 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
|
|||
|
||||
// Verify that we track all the files again after the DB is closed and opened.
|
||||
Close();
|
||||
ASSERT_EQ(untracked_files, files_in_db.size());
|
||||
untracked_files = 0;
|
||||
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
|
||||
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
|
||||
|
||||
sst_file_manager.reset(NewSstFileManager(env_));
|
||||
options.sst_file_manager = sst_file_manager;
|
||||
|
@ -507,6 +524,10 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
|
|||
ASSERT_EQ(files_deleted, 0);
|
||||
ASSERT_EQ(files_scheduled_to_delete, 0);
|
||||
Close();
|
||||
ASSERT_EQ(untracked_files, files_in_db.size());
|
||||
untracked_files = 0;
|
||||
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
|
||||
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
|
||||
assert(arg);
|
||||
|
@ -666,6 +687,9 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFilesWithGC) {
|
|||
}
|
||||
|
||||
Close();
|
||||
ASSERT_EQ(sfm->GetTrackedFiles().size(), 0) << "sfm should be empty";
|
||||
ASSERT_EQ(sfm->GetTotalSize(), 0) << "sfm should be empty";
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
||||
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
|
||||
assert(arg);
|
||||
|
|
|
@ -118,6 +118,16 @@ Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SstFileManagerImpl::OnUntrackFile(const std::string& file_path) {
|
||||
{
|
||||
MutexLock l(&mu_);
|
||||
OnDeleteFileImpl(file_path);
|
||||
}
|
||||
TEST_SYNC_POINT_CALLBACK("SstFileManagerImpl::OnUntrackFile",
|
||||
const_cast<std::string*>(&file_path));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void SstFileManagerImpl::SetMaxAllowedSpaceUsage(uint64_t max_allowed_space) {
|
||||
MutexLock l(&mu_);
|
||||
max_allowed_space_ = max_allowed_space;
|
||||
|
|
|
@ -50,6 +50,9 @@ class SstFileManagerImpl : public SstFileManager {
|
|||
Status OnMoveFile(const std::string& old_path, const std::string& new_path,
|
||||
uint64_t* file_size = nullptr);
|
||||
|
||||
// DB will call OnUntrackFile when closing with an unowned SstFileManager.
|
||||
Status OnUntrackFile(const std::string& file_path);
|
||||
|
||||
// Update the maximum allowed space that should be used by RocksDB, if
|
||||
// the total size of the SST and blob files exceeds max_allowed_space, writes
|
||||
// to RocksDB will fail.
|
||||
|
@ -217,4 +220,3 @@ class SstFileManagerImpl : public SstFileManager {
|
|||
};
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
DB::Close now untracks files in SstFileManager, making avaialble any space used
|
||||
by them. Prior to this change they would be orphaned until the DB is re-opened.
|
Loading…
Reference in New Issue