Fix file deletions in DestroyDB not rate limited (#12891)

Summary:
Make `DestroyDB` slowly delete files if it's configured and enabled via `SstFileManager`.

It's currently not available mainly because of DeleteScheduler's logic related to tracked total_size_ and total_trash_size_. These accounting and logic should not be applied to `DestroyDB`. This PR adds a `DeleteUnaccountedDBFile` util for this purpose which deletes files without accounting it.  This util also supports assigning a file to a specified trash bucket so that user can later wait for a specific trash bucket to be empty. For `DestroyDB`, files with more than 1 hard links will be deleted immediately.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12891

Test Plan: Added unit tests, existing tests.

Reviewed By: anand1976

Differential Revision: D60300220

Pulled By: jowlyzhang

fbshipit-source-id: 8b18109a177a3a9532f6dc2e40e08310c08ca3c7
This commit is contained in:
Yu Zhang 2024-08-02 19:31:55 -07:00 committed by Facebook GitHub Bot
parent 9d5c8c89a1
commit d12aaf23ca
14 changed files with 591 additions and 63 deletions

View File

@ -17,6 +17,7 @@
#include <cstdio>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <sstream>
#include <stdexcept>
@ -5240,6 +5241,14 @@ Status DestroyDB(const std::string& dbname, const Options& options,
Env* env = soptions.env;
std::vector<std::string> filenames;
bool wal_in_db_path = soptions.IsWalDirSameAsDBPath();
auto sfm = static_cast_with_check<SstFileManagerImpl>(
options.sst_file_manager.get());
// Allocate a separate trash bucket to be used by all the to be deleted
// files, so we can later wait for this bucket to be empty before return.
std::optional<int32_t> bucket;
if (sfm) {
bucket = sfm->NewTrashBucket();
}
// Reset the logger because it holds a handle to the
// log file and prevents cleanup and directory removal
@ -5251,6 +5260,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
/*IODebugContext*=*/nullptr)
.PermitUncheckedError();
std::set<std::string> paths_to_delete;
FileLock* lock;
const std::string lockname = LockFileName(dbname);
Status result = env->LockFile(lockname, &lock);
@ -5267,10 +5277,9 @@ Status DestroyDB(const std::string& dbname, const Options& options,
del = DestroyDB(path_to_delete, options);
} else if (type == kTableFile || type == kWalFile ||
type == kBlobFile) {
del = DeleteDBFile(
&soptions, path_to_delete, dbname,
del = DeleteUnaccountedDBFile(&soptions, path_to_delete, dbname,
/*force_bg=*/false,
/*force_fg=*/(type == kWalFile) ? !wal_in_db_path : false);
/*force_fg=*/false, bucket);
} else {
del = env->DeleteFile(path_to_delete);
}
@ -5279,6 +5288,7 @@ Status DestroyDB(const std::string& dbname, const Options& options,
}
}
}
paths_to_delete.insert(dbname);
std::set<std::string> paths;
for (const DbPath& db_path : options.db_paths) {
@ -5300,18 +5310,19 @@ Status DestroyDB(const std::string& dbname, const Options& options,
(type == kTableFile ||
type == kBlobFile)) { // Lock file will be deleted at end
std::string file_path = path + "/" + fname;
Status del = DeleteDBFile(&soptions, file_path, dbname,
/*force_bg=*/false, /*force_fg=*/false);
Status del = DeleteUnaccountedDBFile(&soptions, file_path, dbname,
/*force_bg=*/false,
/*force_fg=*/false, bucket);
if (!del.ok() && result.ok()) {
result = del;
}
}
}
// TODO: Should we return an error if we cannot delete the directory?
env->DeleteDir(path).PermitUncheckedError();
}
}
paths_to_delete.merge(paths);
std::vector<std::string> walDirFiles;
std::string archivedir = ArchivalDirectory(dbname);
bool wal_dir_exists = false;
@ -5335,46 +5346,49 @@ Status DestroyDB(const std::string& dbname, const Options& options,
// Delete archival files.
for (const auto& file : archiveFiles) {
if (ParseFileName(file, &number, &type) && type == kWalFile) {
Status del =
DeleteDBFile(&soptions, archivedir + "/" + file, archivedir,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path);
Status del = DeleteUnaccountedDBFile(
&soptions, archivedir + "/" + file, archivedir,
/*force_bg=*/false, /*force_fg=*/!wal_in_db_path, bucket);
if (!del.ok() && result.ok()) {
result = del;
}
}
}
// Ignore error in case dir contains other files
env->DeleteDir(archivedir).PermitUncheckedError();
paths_to_delete.insert(archivedir);
}
// Delete log files in the WAL dir
if (wal_dir_exists) {
for (const auto& file : walDirFiles) {
if (ParseFileName(file, &number, &type) && type == kWalFile) {
Status del =
DeleteDBFile(&soptions, LogFileName(soptions.wal_dir, number),
Status del = DeleteUnaccountedDBFile(
&soptions, LogFileName(soptions.wal_dir, number),
soptions.wal_dir, /*force_bg=*/false,
/*force_fg=*/!wal_in_db_path);
/*force_fg=*/!wal_in_db_path, bucket);
if (!del.ok() && result.ok()) {
result = del;
}
}
}
// Ignore error in case dir contains other files
env->DeleteDir(soptions.wal_dir).PermitUncheckedError();
paths_to_delete.insert(soptions.wal_dir);
}
// Ignore error since state is already gone
env->UnlockFile(lock).PermitUncheckedError();
env->DeleteFile(lockname).PermitUncheckedError();
// Make sure trash files are all cleared before return.
if (sfm && bucket.has_value()) {
sfm->WaitForEmptyTrashBucket(bucket.value());
}
// sst_file_manager holds a ref to the logger. Make sure the logger is
// gone before trying to remove the directory.
soptions.sst_file_manager.reset();
// Ignore error in case dir contains other files
env->DeleteDir(dbname).PermitUncheckedError();
;
for (const auto& path_to_delete : paths_to_delete) {
env->DeleteDir(path_to_delete).PermitUncheckedError();
}
}
return result;
}

View File

@ -1226,6 +1226,8 @@ class DBImpl : public DB {
return logs_.back().number;
}
void TEST_DeleteObsoleteFiles();
const std::unordered_set<uint64_t>& TEST_GetFilesGrabbedForPurge() const {
return files_grabbed_for_purge_;
}

View File

@ -314,6 +314,11 @@ const autovector<uint64_t>& DBImpl::TEST_GetFilesToQuarantine() const {
return error_handler_.GetFilesToQuarantine();
}
void DBImpl::TEST_DeleteObsoleteFiles() {
InstrumentedMutexLock l(&mutex_);
DeleteObsoleteFiles();
}
size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const {
InstrumentedMutexLock l(&const_cast<DBImpl*>(this)->stats_history_mutex_);
return EstimateInMemoryStatsHistorySize();

View File

@ -507,6 +507,23 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFiles) {
ASSERT_EQ(files_deleted, 0);
ASSERT_EQ(files_scheduled_to_delete, 0);
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_deleted++;
}
});
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_EQ(files_deleted, blob_files.size());
ASSERT_EQ(files_scheduled_to_delete, blob_files.size());
@ -649,6 +666,23 @@ TEST_F(DBSSTTest, DBWithSstFileManagerForBlobFilesWithGC) {
}
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_deleted++;
}
});
ASSERT_OK(DestroyDB(dbname_, options));
sfm->WaitForEmptyTrash();
ASSERT_EQ(files_deleted, 5);
@ -887,7 +921,7 @@ TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
// before restarting the DB.
// We have to set this on the 2nd to last file for it to delay deletion
// on the last file. (Quirk of DeleteScheduler::BackgroundEmptyTrash())
options.sst_file_manager->SetDeleteRateBytesPerSecond(1);
options.sst_file_manager->SetDeleteRateBytesPerSecond(1024 * 1024);
}
ASSERT_OK(Put("Key2", DummyString(1024, v)));
ASSERT_OK(Put("Key3", DummyString(1024, v)));
@ -1902,6 +1936,24 @@ TEST_F(DBSSTTest, DBWithSFMForBlobFilesAtomicFlush) {
ASSERT_EQ(files_deleted, 1);
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion", [&](void* arg) {
assert(arg);
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
++files_scheduled_to_delete;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::OnDeleteFile", [&](void* arg) {
const std::string* const file_path =
static_cast<const std::string*>(arg);
if (EndsWith(*file_path, ".blob")) {
files_deleted++;
}
});
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_EQ(files_scheduled_to_delete, 4);

View File

@ -31,6 +31,7 @@ DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs,
total_trash_size_(0),
rate_bytes_per_sec_(rate_bytes_per_sec),
pending_files_(0),
next_trash_bucket_(0),
bytes_max_delete_chunk_(bytes_max_delete_chunk),
closing_(false),
cv_(&mu_),
@ -66,10 +67,8 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
total_trash_size_.load() > total_size * max_trash_db_ratio_.load())) {
// Rate limiting is disabled or trash size makes up more than
// max_trash_db_ratio_ (default 25%) of the total DB size
TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
Status s = DeleteFileImmediately(file_path, /*accounted=*/true);
if (s.ok()) {
s = sst_file_manager_->OnDeleteFile(file_path);
ROCKS_LOG_INFO(info_log_,
"Deleted file %s immediately, rate_bytes_per_sec %" PRIi64
", total_trash_size %" PRIu64 ", total_size %" PRIi64
@ -77,15 +76,57 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
file_path.c_str(), rate_bytes_per_sec_.load(),
total_trash_size_.load(), total_size,
max_trash_db_ratio_.load());
}
return s;
}
return AddFileToDeletionQueue(file_path, dir_to_sync, /*bucket=*/std::nullopt,
/*accounted=*/true);
}
Status DeleteScheduler::DeleteUnaccountedFile(const std::string& file_path,
const std::string& dir_to_sync,
const bool force_bg,
std::optional<int32_t> bucket) {
uint64_t num_hard_links = 1;
fs_->NumFileLinks(file_path, IOOptions(), &num_hard_links, nullptr)
.PermitUncheckedError();
// We can tolerate rare races where we might immediately delete both links
// to a file.
if (rate_bytes_per_sec_.load() <= 0 || (!force_bg && num_hard_links > 1)) {
Status s = DeleteFileImmediately(file_path, /*accounted=*/false);
if (s.ok()) {
ROCKS_LOG_INFO(info_log_,
"Deleted file %s immediately, rate_bytes_per_sec %" PRIi64,
file_path.c_str(), rate_bytes_per_sec_.load());
}
return s;
}
return AddFileToDeletionQueue(file_path, dir_to_sync, bucket,
/*accounted=*/false);
}
Status DeleteScheduler::DeleteFileImmediately(const std::string& file_path,
bool accounted) {
TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
TEST_SYNC_POINT_CALLBACK("DeleteScheduler::DeleteFile::cb",
const_cast<std::string*>(&file_path));
Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
if (s.ok()) {
s = OnDeleteFile(file_path, accounted);
InstrumentedMutexLock l(&mu_);
RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
}
return s;
}
Status DeleteScheduler::AddFileToDeletionQueue(const std::string& file_path,
const std::string& dir_to_sync,
std::optional<int32_t> bucket,
bool accounted) {
// Move file to trash
std::string trash_file;
Status s = MarkAsTrash(file_path, &trash_file);
Status s = MarkAsTrash(file_path, accounted, &trash_file);
ROCKS_LOG_INFO(info_log_, "Mark file: %s as trash -- %s", trash_file.c_str(),
s.ToString().c_str());
@ -94,7 +135,7 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
file_path.c_str(), s.ToString().c_str());
s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
if (s.ok()) {
s = sst_file_manager_->OnDeleteFile(file_path);
s = OnDeleteFile(file_path, accounted);
ROCKS_LOG_INFO(info_log_, "Deleted file %s immediately",
trash_file.c_str());
InstrumentedMutexLock l(&mu_);
@ -104,12 +145,14 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
}
// Update the total trash size
if (accounted) {
uint64_t trash_file_size = 0;
IOStatus io_s =
fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
if (io_s.ok()) {
total_trash_size_.fetch_add(trash_file_size);
}
}
//**TODO: What should we do if we failed to
// get the file size?
@ -117,8 +160,15 @@ Status DeleteScheduler::DeleteFile(const std::string& file_path,
{
InstrumentedMutexLock l(&mu_);
RecordTick(stats_.get(), FILES_MARKED_TRASH);
queue_.emplace(trash_file, dir_to_sync);
queue_.emplace(trash_file, dir_to_sync, accounted, bucket);
pending_files_++;
if (bucket.has_value()) {
auto iter = pending_files_in_buckets_.find(bucket.value());
assert(iter != pending_files_in_buckets_.end());
if (iter != pending_files_in_buckets_.end()) {
iter->second++;
}
}
if (pending_files_ == 1) {
cv_.SignalAll();
}
@ -177,7 +227,7 @@ Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
}
Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
std::string* trash_file) {
bool accounted, std::string* trash_file) {
// Sanity check of the path
size_t idx = file_path.rfind('/');
if (idx == std::string::npos || idx == file_path.size() - 1) {
@ -211,7 +261,7 @@ Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
}
cnt++;
}
if (s.ok()) {
if (s.ok() && accounted) {
s = sst_file_manager_->OnMoveFile(file_path, *trash_file);
}
return s;
@ -235,6 +285,8 @@ void DeleteScheduler::BackgroundEmptyTrash() {
uint64_t total_deleted_bytes = 0;
int64_t current_delete_rate = rate_bytes_per_sec_.load();
while (!queue_.empty() && !closing_) {
// Satisfy static analysis.
std::optional<int32_t> bucket = std::nullopt;
if (current_delete_rate != rate_bytes_per_sec_.load()) {
// User changed the delete rate
current_delete_rate = rate_bytes_per_sec_.load();
@ -247,14 +299,17 @@ void DeleteScheduler::BackgroundEmptyTrash() {
// Get new file to delete
const FileAndDir& fad = queue_.front();
std::string path_in_trash = fad.fname;
std::string dir_to_sync = fad.dir;
bool accounted = fad.accounted;
bucket = fad.bucket;
// We don't need to hold the lock while deleting the file
mu_.Unlock();
uint64_t deleted_bytes = 0;
bool is_complete = true;
// Delete file from trash and update total_penlty value
Status s =
DeleteTrashFile(path_in_trash, fad.dir, &deleted_bytes, &is_complete);
Status s = DeleteTrashFile(path_in_trash, dir_to_sync, accounted,
&deleted_bytes, &is_complete);
total_deleted_bytes += deleted_bytes;
mu_.Lock();
if (is_complete) {
@ -288,12 +343,20 @@ void DeleteScheduler::BackgroundEmptyTrash() {
TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
&total_penalty);
int32_t pending_files_in_bucket = std::numeric_limits<int32_t>::max();
if (is_complete) {
pending_files_--;
if (bucket.has_value()) {
auto iter = pending_files_in_buckets_.find(bucket.value());
assert(iter != pending_files_in_buckets_.end());
if (iter != pending_files_in_buckets_.end()) {
pending_files_in_bucket = iter->second--;
}
if (pending_files_ == 0) {
// Unblock WaitForEmptyTrash since there are no more files waiting
// to be deleted
}
}
if (pending_files_ == 0 || pending_files_in_bucket == 0) {
// Unblock WaitForEmptyTrash or WaitForEmptyTrashBucket since there are
// no more files waiting to be deleted
cv_.SignalAll();
}
}
@ -302,12 +365,14 @@ void DeleteScheduler::BackgroundEmptyTrash() {
Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
const std::string& dir_to_sync,
uint64_t* deleted_bytes,
bool accounted, uint64_t* deleted_bytes,
bool* is_complete) {
uint64_t file_size;
Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr);
*is_complete = true;
TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
TEST_SYNC_POINT_CALLBACK("DeleteScheduler::DeleteTrashFile::cb",
const_cast<std::string*>(&path_in_trash));
if (s.ok()) {
bool need_full_delete = true;
if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) {
@ -374,7 +439,7 @@ Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
}
if (s.ok()) {
*deleted_bytes = file_size;
s = sst_file_manager_->OnDeleteFile(path_in_trash);
s = OnDeleteFile(path_in_trash, accounted);
}
}
}
@ -384,12 +449,24 @@ Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
path_in_trash.c_str(), s.ToString().c_str());
*deleted_bytes = 0;
} else {
if (accounted) {
total_trash_size_.fetch_sub(*deleted_bytes);
}
}
return s;
}
Status DeleteScheduler::OnDeleteFile(const std::string& file_path,
bool accounted) {
if (accounted) {
return sst_file_manager_->OnDeleteFile(file_path);
}
TEST_SYNC_POINT_CALLBACK("DeleteScheduler::OnDeleteFile",
const_cast<std::string*>(&file_path));
return Status::OK();
}
void DeleteScheduler::WaitForEmptyTrash() {
InstrumentedMutexLock l(&mu_);
while (pending_files_ > 0 && !closing_) {
@ -397,6 +474,30 @@ void DeleteScheduler::WaitForEmptyTrash() {
}
}
std::optional<int32_t> DeleteScheduler::NewTrashBucket() {
if (rate_bytes_per_sec_.load() <= 0) {
return std::nullopt;
}
InstrumentedMutexLock l(&mu_);
int32_t bucket_number = next_trash_bucket_++;
pending_files_in_buckets_.emplace(bucket_number, 0);
return bucket_number;
}
void DeleteScheduler::WaitForEmptyTrashBucket(int32_t bucket) {
InstrumentedMutexLock l(&mu_);
if (bucket >= next_trash_bucket_) {
return;
}
auto iter = pending_files_in_buckets_.find(bucket);
while (iter != pending_files_in_buckets_.end() && iter->second > 0 &&
!closing_) {
cv_.Wait();
iter = pending_files_in_buckets_.find(bucket);
}
pending_files_in_buckets_.erase(bucket);
}
void DeleteScheduler::MaybeCreateBackgroundThread() {
if (bg_thread_ == nullptr && rate_bytes_per_sec_.load() > 0) {
bg_thread_.reset(

View File

@ -7,6 +7,7 @@
#include <map>
#include <optional>
#include <queue>
#include <string>
#include <thread>
@ -48,16 +49,45 @@ class DeleteScheduler {
MaybeCreateBackgroundThread();
}
// Mark file as trash directory and schedule its deletion. If force_bg is
// set, it forces the file to always be deleted in the background thread,
// except when rate limiting is disabled
// Delete an accounted file that is tracked by `SstFileManager` and should be
// tracked by this `DeleteScheduler` when it's deleted.
// The file is deleted immediately if slow deletion is disabled. If force_bg
// is not set and trash to db size ratio exceeded the configured threshold,
// it is immediately deleted too. In all other cases, the file will be moved
// to a trash directory and scheduled for deletion by a background thread.
Status DeleteFile(const std::string& fname, const std::string& dir_to_sync,
const bool force_bg = false);
// Wait for all files being deleteing in the background to finish or for
// Delete an unaccounted file that is not tracked by `SstFileManager` and
// should not be tracked by this `DeleteScheduler` when it's deleted.
// The file is deleted immediately if slow deletion is disabled. If force_bg
// is not set and the file have more than 1 hard link, it is immediately
// deleted too. In all other cases, the file will be moved to a trash
// directory and scheduled for deletion by a background thread.
// This API also supports assign a file to a specified bucket created by
// `NewTrashBucket` when delete files in the background. So the caller can
// wait for a specific bucket to be empty by checking the
// `WaitForEmptyTrashBucket` API.
Status DeleteUnaccountedFile(const std::string& file_path,
const std::string& dir_to_sync,
const bool force_bg = false,
std::optional<int32_t> bucket = std::nullopt);
// Wait for all files being deleted in the background to finish or for
// destructor to be called.
void WaitForEmptyTrash();
// Creates a new trash bucket. A bucket is only created and returned when slow
// deletion is enabled.
// For each bucket that is created, the user should also call
// `WaitForEmptyTrashBucket` after scheduling file deletions to make sure the
// trash files are all cleared.
std::optional<int32_t> NewTrashBucket();
// Wait for all the files in the specified bucket to be deleted in the
// background or for the destructor to be called.
void WaitForEmptyTrashBucket(int32_t bucket);
// Return a map containing errors that happened in BackgroundEmptyTrash
// file_path => error status
std::map<std::string, Status> GetBackgroundErrors();
@ -87,12 +117,21 @@ class DeleteScheduler {
}
private:
Status MarkAsTrash(const std::string& file_path, std::string* path_in_trash);
Status DeleteFileImmediately(const std::string& file_path, bool accounted);
Status AddFileToDeletionQueue(const std::string& file_path,
const std::string& dir_to_sync,
std::optional<int32_t> bucket, bool accounted);
Status MarkAsTrash(const std::string& file_path, bool accounted,
std::string* path_in_trash);
Status DeleteTrashFile(const std::string& path_in_trash,
const std::string& dir_to_sync,
const std::string& dir_to_sync, bool accounted,
uint64_t* deleted_bytes, bool* is_complete);
Status OnDeleteFile(const std::string& file_path, bool accounted);
void BackgroundEmptyTrash();
void MaybeCreateBackgroundThread();
@ -104,19 +143,28 @@ class DeleteScheduler {
std::atomic<uint64_t> total_trash_size_;
// Maximum number of bytes that should be deleted per second
std::atomic<int64_t> rate_bytes_per_sec_;
// Mutex to protect queue_, pending_files_, bg_errors_, closing_, stats_
// Mutex to protect queue_, pending_files_, next_trash_bucket_,
// pending_files_in_buckets_, bg_errors_, closing_, stats_
InstrumentedMutex mu_;
struct FileAndDir {
FileAndDir(const std::string& f, const std::string& d) : fname(f), dir(d) {}
FileAndDir(const std::string& _fname, const std::string& _dir,
bool _accounted, std::optional<int32_t> _bucket)
: fname(_fname), dir(_dir), accounted(_accounted), bucket(_bucket) {}
std::string fname;
std::string dir; // empty will be skipped.
bool accounted;
std::optional<int32_t> bucket;
};
// Queue of trash files that need to be deleted
std::queue<FileAndDir> queue_;
// Number of trash files that are waiting to be deleted
int32_t pending_files_;
// Next trash bucket that can be created
int32_t next_trash_bucket_;
// A mapping from trash bucket to number of pending files in the bucket
std::map<int32_t, int32_t> pending_files_in_buckets_;
uint64_t bytes_max_delete_chunk_;
// Errors that happened in BackgroundEmptyTrash (file_path => error)
std::map<std::string, Status> bg_errors_;
@ -127,6 +175,7 @@ class DeleteScheduler {
// Condition variable signaled in these conditions
// - pending_files_ value change from 0 => 1
// - pending_files_ value change from 1 => 0
// - a value in pending_files_in_buckets change from 1 => 0
// - closing_ value is set to true
InstrumentedCondVar cv_;
// Background thread running BackgroundEmptyTrash
@ -138,6 +187,10 @@ class DeleteScheduler {
// If the trash size constitutes for more than this fraction of the total DB
// size we will start deleting new files passed to DeleteScheduler
// immediately
// Unaccounted files passed for deletion will not cause change in
// total_trash_size_ or affect the DeleteScheduler::total_trash_size_ over
// SstFileManager::total_size_ ratio. Their slow deletion is not subject to
// this configured threshold either.
std::atomic<double> max_trash_db_ratio_;
static const uint64_t kMicrosInSecond = 1000 * 1000LL;
std::shared_ptr<Statistics> stats_;

View File

@ -78,7 +78,7 @@ class DeleteSchedulerTest : public testing::Test {
}
std::string NewDummyFile(const std::string& file_name, uint64_t size = 1024,
size_t dummy_files_dirs_idx = 0) {
size_t dummy_files_dirs_idx = 0, bool track = true) {
std::string file_path =
dummy_files_dirs_[dummy_files_dirs_idx] + "/" + file_name;
std::unique_ptr<WritableFile> f;
@ -86,7 +86,9 @@ class DeleteSchedulerTest : public testing::Test {
std::string data(size, 'A');
EXPECT_OK(f->Append(data));
EXPECT_OK(f->Close());
if (track) {
EXPECT_OK(sst_file_mgr_->OnAddFile(file_path));
}
return file_path;
}
@ -353,6 +355,8 @@ TEST_F(DeleteSchedulerTest, DisableRateLimiting) {
ASSERT_EQ(num_files,
stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
ASSERT_FALSE(delete_scheduler_->NewTrashBucket().has_value());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
@ -718,6 +722,141 @@ TEST_F(DeleteSchedulerTest, IsTrashCheck) {
ASSERT_FALSE(DeleteScheduler::IsTrashFile("abc.trashx"));
}
TEST_F(DeleteSchedulerTest, DeleteAccountedAndUnaccountedFiles) {
rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / s
NewDeleteScheduler();
// Create 100 files, every file is 1 KB
int num_files = 100; // 100 files
uint64_t file_size = 1024; // 1 KB as a file size
std::vector<std::string> generated_files;
for (int i = 0; i < num_files; i++) {
std::string file_name = "file" + std::to_string(i) + ".data";
generated_files.push_back(NewDummyFile(file_name, file_size,
/*dummy_files_dirs_idx*/ 0,
/*track=*/false));
}
for (int i = 0; i < num_files; i++) {
if (i % 2) {
ASSERT_OK(sst_file_mgr_->OnAddFile(generated_files[i], file_size));
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i], ""));
} else {
ASSERT_OK(
delete_scheduler_->DeleteUnaccountedFile(generated_files[i], ""));
}
}
delete_scheduler_->WaitForEmptyTrash();
ASSERT_EQ(0, delete_scheduler_->GetTotalTrashSize());
ASSERT_EQ(0, sst_file_mgr_->GetTotalSize());
}
TEST_F(DeleteSchedulerTest, ConcurrentlyDeleteUnaccountedFilesInBuckets) {
int bg_delete_file = 0;
int fg_delete_file = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* /*arg*/) { bg_delete_file++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteFile", [&](void* /*arg*/) { fg_delete_file++; });
rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / s
NewDeleteScheduler();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Create 1000 files, every file is 1 KB
int num_files = 1000;
uint64_t file_size = 1024; // 1 KB as a file size
std::vector<std::string> generated_files;
for (int i = 0; i < num_files; i++) {
std::string file_name = "file" + std::to_string(i) + ".data";
generated_files.push_back(NewDummyFile(file_name, file_size,
/*dummy_files_dirs_idx*/ 0,
/*track=*/false));
}
// Concurrently delete files in different buckets and check all the buckets
// are empty.
int thread_cnt = 10;
int files_per_thread = 100;
std::atomic<int> thread_num(0);
std::vector<port::Thread> threads;
std::function<void()> delete_thread = [&]() {
std::optional<int32_t> bucket = delete_scheduler_->NewTrashBucket();
ASSERT_TRUE(bucket.has_value());
int idx = thread_num.fetch_add(1);
int range_start = idx * files_per_thread;
int range_end = range_start + files_per_thread;
for (int j = range_start; j < range_end; j++) {
ASSERT_OK(delete_scheduler_->DeleteUnaccountedFile(
generated_files[j], "", /*false_bg=*/false, bucket));
}
delete_scheduler_->WaitForEmptyTrashBucket(bucket.value());
};
for (int i = 0; i < thread_cnt; i++) {
threads.emplace_back(delete_thread);
}
for (size_t i = 0; i < threads.size(); i++) {
threads[i].join();
}
ASSERT_EQ(0, delete_scheduler_->GetTotalTrashSize());
ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
ASSERT_EQ(1000, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(0, fg_delete_file);
ASSERT_EQ(1000, bg_delete_file);
// OK to re check an already empty bucket
delete_scheduler_->WaitForEmptyTrashBucket(9);
// Invalid bucket return too.
delete_scheduler_->WaitForEmptyTrashBucket(100);
std::optional<int32_t> next_bucket = delete_scheduler_->NewTrashBucket();
ASSERT_TRUE(next_bucket.has_value());
ASSERT_EQ(10, next_bucket.value());
delete_scheduler_->WaitForEmptyTrashBucket(10);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DeleteSchedulerTest,
ImmediatelyDeleteUnaccountedFilesWithRemainingLinks) {
int bg_delete_file = 0;
int fg_delete_file = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* /*arg*/) { bg_delete_file++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteFile", [&](void* /*arg*/) { fg_delete_file++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / sec
NewDeleteScheduler();
std::string file1 = NewDummyFile("data_1", 500 * 1024,
/*dummy_files_dirs_idx*/ 0, /*track=*/false);
std::string file2 = NewDummyFile("data_2", 100 * 1024,
/*dummy_files_dirs_idx*/ 0, /*track=*/false);
ASSERT_OK(env_->LinkFile(file1, dummy_files_dirs_[0] + "/data_1b"));
ASSERT_OK(env_->LinkFile(file2, dummy_files_dirs_[0] + "/data_2b"));
// Should delete in 4 batch if there is no hardlink
ASSERT_OK(
delete_scheduler_->DeleteUnaccountedFile(file1, "", /*force_bg=*/false));
ASSERT_OK(
delete_scheduler_->DeleteUnaccountedFile(file2, "", /*force_bg=*/false));
delete_scheduler_->WaitForEmptyTrash();
ASSERT_EQ(0, delete_scheduler_->GetTotalTrashSize());
ASSERT_EQ(0, bg_delete_file);
ASSERT_EQ(2, fg_delete_file);
ASSERT_EQ(0, stats_->getAndResetTickerCount(FILES_MARKED_TRASH));
ASSERT_EQ(2, stats_->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

View File

@ -125,8 +125,8 @@ IOStatus CreateFile(FileSystem* fs, const std::string& destination,
Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname, const std::string& dir_to_sync,
const bool force_bg, const bool force_fg) {
SstFileManagerImpl* sfm =
static_cast<SstFileManagerImpl*>(db_options->sst_file_manager.get());
SstFileManagerImpl* sfm = static_cast_with_check<SstFileManagerImpl>(
db_options->sst_file_manager.get());
if (sfm && !force_fg) {
return sfm->ScheduleFileDeletion(fname, dir_to_sync, force_bg);
} else {
@ -134,6 +134,21 @@ Status DeleteDBFile(const ImmutableDBOptions* db_options,
}
}
Status DeleteUnaccountedDBFile(const ImmutableDBOptions* db_options,
const std::string& fname,
const std::string& dir_to_sync,
const bool force_bg, const bool force_fg,
std::optional<int32_t> bucket) {
SstFileManagerImpl* sfm = static_cast_with_check<SstFileManagerImpl>(
db_options->sst_file_manager.get());
if (sfm && !force_fg) {
return sfm->ScheduleUnaccountedFileDeletion(fname, dir_to_sync, force_bg,
bucket);
} else {
return db_options->env->DeleteFile(fname);
}
}
// requested_checksum_func_name brings the function name of the checksum
// generator in checksum_factory. Empty string is permitted, in which case the
// name of the generator created by the factory is unchecked. When

View File

@ -55,6 +55,16 @@ Status DeleteDBFile(const ImmutableDBOptions* db_options,
const std::string& fname, const std::string& path_to_sync,
const bool force_bg, const bool force_fg);
// Delete an unaccounted DB file that is not tracked by SstFileManager and will
// not be tracked by its DeleteScheduler when getting deleted.
// If a legitimate bucket is provided and this file is scheduled for slow
// deletion, it will be assigned to the specified trash bucket.
Status DeleteUnaccountedDBFile(const ImmutableDBOptions* db_options,
const std::string& fname,
const std::string& dir_to_sync,
const bool force_bg, const bool force_fg,
std::optional<int32_t> bucket);
// TODO(hx235): pass the whole DBOptions intead of its individual fields
IOStatus GenerateOneFileChecksum(
FileSystem* fs, const std::string& file_path,

View File

@ -421,10 +421,28 @@ Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path,
return delete_scheduler_.DeleteFile(file_path, path_to_sync, force_bg);
}
Status SstFileManagerImpl::ScheduleUnaccountedFileDeletion(
const std::string& file_path, const std::string& dir_to_sync,
const bool force_bg, std::optional<int32_t> bucket) {
TEST_SYNC_POINT_CALLBACK(
"SstFileManagerImpl::ScheduleUnaccountedFileDeletion",
const_cast<std::string*>(&file_path));
return delete_scheduler_.DeleteUnaccountedFile(file_path, dir_to_sync,
force_bg, bucket);
}
void SstFileManagerImpl::WaitForEmptyTrash() {
delete_scheduler_.WaitForEmptyTrash();
}
std::optional<int32_t> SstFileManagerImpl::NewTrashBucket() {
return delete_scheduler_.NewTrashBucket();
}
void SstFileManagerImpl::WaitForEmptyTrashBucket(int32_t bucket) {
delete_scheduler_.WaitForEmptyTrashBucket(bucket);
}
void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
uint64_t file_size) {
auto tracked_file = tracked_files_.find(file_path);

View File

@ -5,7 +5,7 @@
#pragma once
#include <optional>
#include <string>
#include "db/compaction/compaction.h"
@ -118,17 +118,40 @@ class SstFileManagerImpl : public SstFileManager {
// not guaranteed
bool CancelErrorRecovery(ErrorHandler* db);
// Mark file as trash and schedule it's deletion. If force_bg is set, it
// Mark a file as trash and schedule its deletion. If force_bg is set, it
// forces the file to be deleting in the background regardless of DB size,
// except when rate limited delete is disabled
// except when rate limited delete is disabled.
virtual Status ScheduleFileDeletion(const std::string& file_path,
const std::string& dir_to_sync,
const bool force_bg = false);
// Wait for all files being deleteing in the background to finish or for
// Delete an unaccounted file. The file is deleted immediately if slow
// deletion is disabled. A file with more than 1 hard links will be deleted
// immediately unless force_bg is set. In other cases, files will be scheduled
// for slow deletion, and assigned to the specified bucket if a legitimate one
// is provided. A legitimate bucket is one that is created with the
// `NewTrashBucket` API, and for which `WaitForEmptyTrashBucket` hasn't been
// called yet.
virtual Status ScheduleUnaccountedFileDeletion(
const std::string& file_path, const std::string& dir_to_sync,
const bool force_bg = false,
std::optional<int32_t> bucket = std::nullopt);
// Wait for all files being deleted in the background to finish or for
// destructor to be called.
virtual void WaitForEmptyTrash();
// Creates a new trash bucket. A legitimate bucket is only created and
// returned when slow deletion is enabled.
// For each bucket that is created and used, the user should also call
// `WaitForEmptyTrashBucket` after scheduling file deletions to make sure all
// the trash files are cleared.
std::optional<int32_t> NewTrashBucket();
// Wait for all the files in the specified bucket to be deleted in the
// background or for destructor to be called.
virtual void WaitForEmptyTrashBucket(int32_t bucket);
DeleteScheduler* delete_scheduler() { return &delete_scheduler_; }
// Stop the error recovery background thread. This should be called only

View File

@ -0,0 +1 @@
*Make DestroyDB supports slow deletion when it's configured in `SstFileManager`. The slow deletion is subject to the configured `rate_bytes_per_sec`, but not subject to the `max_trash_db_ratio`.

View File

@ -760,7 +760,7 @@ TEST_F(BlobDBTest, SstFileManager) {
// run the same test for Get(), MultiGet() and Iterator each.
std::shared_ptr<SstFileManager> sst_file_manager(
NewSstFileManager(mock_env_.get()));
sst_file_manager->SetDeleteRateBytesPerSecond(1);
sst_file_manager->SetDeleteRateBytesPerSecond(1024 * 1024);
SstFileManagerImpl *sfm =
static_cast<SstFileManagerImpl *>(sst_file_manager.get());
@ -818,7 +818,7 @@ TEST_F(BlobDBTest, SstFileManagerRestart) {
// run the same test for Get(), MultiGet() and Iterator each.
std::shared_ptr<SstFileManager> sst_file_manager(
NewSstFileManager(mock_env_.get()));
sst_file_manager->SetDeleteRateBytesPerSecond(1);
sst_file_manager->SetDeleteRateBytesPerSecond(1024 * 1024);
SstFileManagerImpl *sfm =
static_cast<SstFileManagerImpl *>(sst_file_manager.get());

View File

@ -25,6 +25,7 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
@ -1031,6 +1032,100 @@ TEST_F(CheckpointTest, CheckpointWithArchievedLog) {
delete snapshot_db;
}
class CheckpointDestroyTest : public CheckpointTest,
public testing::WithParamInterface<bool> {};
TEST_P(CheckpointDestroyTest, DisableEnableSlowDeletion) {
bool slow_deletion = GetParam();
Options options = CurrentOptions();
options.num_levels = 2;
options.disable_auto_compactions = true;
Status s;
options.sst_file_manager.reset(NewSstFileManager(
options.env, options.info_log, "", slow_deletion ? 1024 * 1024 : 0,
false /* delete_existing_trash */, &s, 1));
ASSERT_OK(s);
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "a"));
ASSERT_OK(Flush());
ASSERT_OK(Put("bar", "b"));
ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put("bar", "val" + std::to_string(i)));
ASSERT_OK(Flush());
}
ASSERT_EQ(NumTableFilesAtLevel(0), 10);
ASSERT_EQ(NumTableFilesAtLevel(1), 2);
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
delete checkpoint;
checkpoint = nullptr;
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 2);
DB* snapshot_db;
ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
ReadOptions read_opts;
std::string get_result;
ASSERT_OK(snapshot_db->Get(read_opts, "foo", &get_result));
ASSERT_EQ("a", get_result);
ASSERT_OK(snapshot_db->Get(read_opts, "bar", &get_result));
ASSERT_EQ("val9", get_result);
delete snapshot_db;
// Make sure original obsolete files for hard linked files are all deleted.
DBImpl* db_impl = static_cast_with_check<DBImpl>(db_);
db_impl->TEST_DeleteObsoleteFiles();
auto sfm = static_cast_with_check<SstFileManagerImpl>(
options.sst_file_manager.get());
ASSERT_NE(nullptr, sfm);
sfm->WaitForEmptyTrash();
// SST file 2-12 for "bar" will be compacted into one file on L1 during the
// compaction after checkpoint is created. SST file 1 on L1: foo, seq:
// 1 (hard links is 1 after checkpoint destroy)
std::atomic<int> bg_delete_sst{0};
std::atomic<int> fg_delete_sst{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteFile::cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto file_name = *static_cast<std::string*>(arg);
if (file_name.size() >= 4 &&
file_name.compare(file_name.size() - 4, 4, ".sst") == 0) {
fg_delete_sst.fetch_add(1);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DeleteScheduler::DeleteTrashFile::cb", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto file_name = *static_cast<std::string*>(arg);
if (file_name.size() >= 10 &&
file_name.compare(file_name.size() - 10, 10, ".sst.trash") == 0) {
bg_delete_sst.fetch_add(1);
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(DestroyDB(snapshot_name_, options));
if (slow_deletion) {
ASSERT_EQ(fg_delete_sst, 1);
ASSERT_EQ(bg_delete_sst, 11);
} else {
ASSERT_EQ(fg_delete_sst, 12);
}
ASSERT_EQ("a", Get("foo"));
ASSERT_EQ("val9", Get("bar"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(CheckpointDestroyTest, CheckpointDestroyTest,
::testing::Values(true, false));
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {