Add SstFileManager (component tracking all SST file in DBs and control the deletion rate)

Summary:
Add a new class SstFileTracker that will be notified whenever a DB add/delete/move and sst file, it will also replace DeleteScheduler
SstFileTracker can be used later to abort writes when we exceed a specific size

Test Plan: unit tests

Reviewers: rven, anthony, yhchiang, sdong

Reviewed By: sdong

Subscribers: igor, lovro, march, dhruba

Differential Revision: https://reviews.facebook.net/D50469
This commit is contained in:
Islam AbdelRahman 2016-01-28 18:35:01 -08:00
parent 45768ade4f
commit d6c838f1e1
20 changed files with 546 additions and 242 deletions

View file

@ -196,13 +196,14 @@ set(SOURCES
util/comparator.cc
util/concurrent_arena.cc
util/crc32c.cc
util/delete_scheduler_impl.cc
util/delete_scheduler.cc
util/dynamic_bloom.cc
util/env.cc
util/env_hdfs.cc
util/event_logger.cc
util/file_util.cc
util/file_reader_writer.cc
util/sst_file_manager_impl.cc
util/filter_policy.cc
util/hash.cc
util/histogram.cc

View file

@ -3,9 +3,11 @@
### Public API Changes
* Add a new perf context level between kEnableCount and kEnableTime. Level 2 now doesn't include timers for mutexes.
* Statistics of mutex operation durations will not be measured by default. If you want to have them enabled, you need to set Statistics::stats_level_ to kAll.
* DBOptions::delete_scheduler and NewDeleteScheduler() are removed, please use DBOptions::sst_file_manager and NewSstFileManager() instead
### New Features
* ldb tool now supports operations to non-default column families.
* Add DBOptions::sst_file_manager. Use NewSstFileManager() in include/rocksdb/sst_file_manager.h to create a SstFileManager that can be used to track the total size of SST files and control the SST files deletion rate.
## 4.4.0 (1/14/2016)
### Public API Changes

View file

@ -51,6 +51,7 @@
#include "util/iostats_context_imp.h"
#include "util/log_buffer.h"
#include "util/logging.h"
#include "util/sst_file_manager_impl.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/stop_watch.h"
@ -498,11 +499,16 @@ Status CompactionJob::Run() {
}
TablePropertiesCollection tp;
auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(),
output.meta.fd.GetPathId());
tp[fn] = output.table_properties;
if (sfm && output.meta.fd.GetPathId() == 0) {
sfm->OnAddFile(fn);
}
}
}
compact_->compaction->SetOutputTableProperties(std::move(tp));

View file

@ -64,7 +64,6 @@
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/sst_file_writer.h"
@ -89,6 +88,7 @@
#include "util/log_buffer.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/sst_file_manager_impl.h"
#include "util/options_helper.h"
#include "util/options_parser.h"
#include "util/perf_context_imp.h"
@ -786,8 +786,8 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
}
#endif // !ROCKSDB_LITE
Status file_deletion_status;
if (type == kTableFile && path_id == 0) {
file_deletion_status = DeleteOrMoveToTrash(&db_options_, fname);
if (type == kTableFile) {
file_deletion_status = DeleteSSTFile(&db_options_, fname, path_id);
} else {
file_deletion_status = env_->DeleteFile(fname);
}
@ -1509,6 +1509,14 @@ Status DBImpl::FlushMemTableToOutputFile(
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
job_context->job_id, flush_job.GetTableProperties());
auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
if (sfm) {
// Notify sst_file_manager that a new file was added
std::string file_path = MakeTableFileName(db_options_.db_paths[0].path,
file_meta.fd.GetNumber());
sfm->OnAddFile(file_path);
}
}
#endif // ROCKSDB_LITE
return s;
@ -5406,6 +5414,25 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
impl->mutex_.Unlock();
auto sfm = static_cast<SstFileManagerImpl*>(
impl->db_options_.sst_file_manager.get());
if (s.ok() && sfm) {
// Notify SstFileManager about all sst files that already exist in
// db_paths[0] when the DB is opened.
auto& db_path = impl->db_options_.db_paths[0];
std::vector<std::string> existing_files;
impl->db_options_.env->GetChildren(db_path.path, &existing_files);
for (auto& file_name : existing_files) {
uint64_t file_number;
FileType file_type;
std::string file_path = db_path.path + "/" + file_name;
if (ParseFileName(file_name, &file_number, &file_type) &&
file_type == kTableFile) {
sfm->OnAddFile(file_path);
}
}
}
if (s.ok()) {
Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p",
impl);
@ -5465,7 +5492,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
if (type == kMetaDatabase) {
del = DestroyDB(path_to_delete, options);
} else if (type == kTableFile) {
del = DeleteOrMoveToTrash(&options, path_to_delete);
del = DeleteSSTFile(&options, path_to_delete, 0);
} else {
del = env->DeleteFile(path_to_delete);
}
@ -5481,13 +5508,9 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) &&
type == kTableFile) { // Lock file will be deleted at end
Status del;
std::string table_path = db_path.path + "/" + filenames[i];
if (path_id == 0) {
del = DeleteOrMoveToTrash(&options, table_path);
} else {
del = env->DeleteFile(table_path);
}
Status del = DeleteSSTFile(&options, table_path,
static_cast<uint32_t>(path_id));
if (result.ok() && !del.ok()) {
result = del;
}

View file

@ -37,9 +37,9 @@
#include "rocksdb/compaction_filter.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h"
#include "rocksdb/experimental.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "rocksdb/perf_context.h"
@ -65,9 +65,10 @@
#include "util/compression.h"
#include "util/mutexlock.h"
#include "util/rate_limiter.h"
#include "util/sst_file_manager_impl.h"
#include "util/statistics.h"
#include "util/testharness.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/mock_env.h"
#include "util/string_util.h"
@ -8431,15 +8432,78 @@ TEST_F(DBTest, DeletingOldWalAfterDrop) {
}
#ifndef ROCKSDB_LITE
TEST_F(DBTest, DBWithSstFileManager) {
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
int files_added = 0;
int files_deleted = 0;
int files_moved = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnAddFile", [&](void* arg) { files_added++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnDeleteFile", [&](void* arg) { files_deleted++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"SstFileManagerImpl::OnMoveFile", [&](void* arg) { files_moved++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.sst_file_manager = sst_file_manager;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 25; i++) {
GenerateNewRandomFile(&rnd);
ASSERT_OK(Flush());
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
// Verify that we are tracking all sst files in dbname_
ASSERT_EQ(sfm->GetTrackedFiles(), GetAllSSTFiles());
}
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
auto files_in_db = GetAllSSTFiles();
// Verify that we are tracking all sst files in dbname_
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
// Verify the total files size
uint64_t total_files_size = 0;
for (auto& file_to_size : files_in_db) {
total_files_size += file_to_size.second;
}
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
// We flushed at least 25 files
ASSERT_GE(files_added, 25);
// Compaction must have deleted some files
ASSERT_GT(files_deleted, 0);
// No files were moved
ASSERT_EQ(files_moved, 0);
Close();
Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
// Verify that we track all the files again after the DB is closed and opened
Close();
sst_file_manager.reset(NewSstFileManager(env_));
options.sst_file_manager = sst_file_manager;
sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
Reopen(options);
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest, RateLimitedDelete) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DBTest::RateLimitedDelete:1",
"DeleteSchedulerImpl::BackgroundEmptyTrash"},
{"DBTest::RateLimitedDelete:1", "DeleteScheduler::BackgroundEmptyTrash"},
});
std::vector<uint64_t> penalties;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::BackgroundEmptyTrash:Wait",
"DeleteScheduler::BackgroundEmptyTrash:Wait",
[&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
@ -8450,9 +8514,10 @@ TEST_F(DBTest, RateLimitedDelete) {
std::string trash_dir = test::TmpDir(env_) + "/trash";
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
Status s;
options.delete_scheduler.reset(NewDeleteScheduler(
env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s));
options.sst_file_manager.reset(NewSstFileManager(
env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
ASSERT_OK(s);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
Destroy(last_options_);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -8479,7 +8544,7 @@ TEST_F(DBTest, RateLimitedDelete) {
uint64_t delete_start_time = env_->NowMicros();
// Hold BackgroundEmptyTrash
TEST_SYNC_POINT("DBTest::RateLimitedDelete:1");
options.delete_scheduler->WaitForEmptyTrash();
sfm->WaitForEmptyTrash();
uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
uint64_t total_files_size = 0;
@ -8502,7 +8567,7 @@ TEST_F(DBTest, RateLimitedDelete) {
TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -8515,9 +8580,10 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) {
std::string trash_dir = test::TmpDir(env_) + "/trash";
int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec
Status s;
options.delete_scheduler.reset(NewDeleteScheduler(
env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s));
options.sst_file_manager.reset(NewSstFileManager(
env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
ASSERT_OK(s);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
DestroyAndReopen(options);
@ -8551,7 +8617,7 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) {
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
ASSERT_EQ("0,2", FilesPerLevel(0));
options.delete_scheduler->WaitForEmptyTrash();
sfm->WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, 8);
compact_options.bottommost_level_compaction =
@ -8559,7 +8625,7 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) {
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(0));
options.delete_scheduler->WaitForEmptyTrash();
sfm->WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, 8);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
@ -8568,7 +8634,7 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) {
TEST_F(DBTest, DestroyDBWithRateLimitedDelete) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
@ -8590,12 +8656,13 @@ TEST_F(DBTest, DestroyDBWithRateLimitedDelete) {
std::string trash_dir = test::TmpDir(env_) + "/trash";
int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec
Status s;
options.delete_scheduler.reset(NewDeleteScheduler(
env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s));
options.sst_file_manager.reset(NewSstFileManager(
env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
ASSERT_OK(s);
ASSERT_OK(DestroyDB(dbname_, options));
options.delete_scheduler->WaitForEmptyTrash();
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->WaitForEmptyTrash();
// We have deleted the 4 sst files in the delete_scheduler
ASSERT_EQ(bg_delete_file, 4);
}
@ -10073,7 +10140,6 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) {
ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist);
}
#endif // ROCKSDB_LITE
class SliceTransformLimitedDomain : public SliceTransform {

View file

@ -1005,4 +1005,22 @@ void DBTestBase::CopyFile(const std::string& source,
ASSERT_OK(destfile->Close());
}
std::unordered_map<std::string, uint64_t> DBTestBase::GetAllSSTFiles() {
std::unordered_map<std::string, uint64_t> res;
std::vector<std::string> files;
env_->GetChildren(dbname_, &files);
for (auto& file_name : files) {
uint64_t number;
FileType type;
std::string file_path = dbname_ + "/" + file_name;
if (ParseFileName(file_name, &number, &type) && type == kTableFile) {
uint64_t file_size = 0;
env_->GetFileSize(file_path, &file_size);
res[file_path] = file_size;
}
}
return res;
}
} // namespace rocksdb

View file

@ -19,6 +19,7 @@
#endif
#include <algorithm>
#include <map>
#include <set>
#include <string>
#include <thread>
@ -750,6 +751,8 @@ class DBTestBase : public testing::Test {
void CopyFile(const std::string& source, const std::string& destination,
uint64_t size = 0);
std::unordered_map<std::string, uint64_t> GetAllSSTFiles();
};
} // namespace rocksdb

View file

@ -1,67 +0,0 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <map>
#include <string>
#include <memory>
#include "rocksdb/status.h"
namespace rocksdb {
class Env;
class Logger;
// DeleteScheduler allow the DB to enforce a rate limit on file deletion,
// Instead of deleteing files immediately, files are moved to trash_dir
// and deleted in a background thread that apply sleep penlty between deletes
// if they are happening in a rate faster than rate_bytes_per_sec,
//
// Rate limiting can be turned off by setting rate_bytes_per_sec = 0, In this
// case DeleteScheduler will delete files immediately.
class DeleteScheduler {
public:
virtual ~DeleteScheduler() {}
// Return delete rate limit in bytes per second
virtual int64_t GetRateBytesPerSecond() = 0;
// Move file to trash directory and schedule it's deletion
virtual Status DeleteFile(const std::string& fname) = 0;
// Return a map containing errors that happened in the background thread
// file_path => error status
virtual std::map<std::string, Status> GetBackgroundErrors() = 0;
// Wait for all files being deleteing in the background to finish or for
// destructor to be called.
virtual void WaitForEmptyTrash() = 0;
};
// Create a new DeleteScheduler that can be shared among multiple RocksDB
// instances to control the file deletion rate.
//
// @env: Pointer to Env object, please see "rocksdb/env.h".
// @trash_dir: Path to the directory where deleted files will be moved into
// to be deleted in a background thread while applying rate limiting. If this
// directory dont exist, it will be created. This directory should not be
// used by any other process or any other DeleteScheduler.
// @rate_bytes_per_sec: How many bytes should be deleted per second, If this
// value is set to 1024 (1 Kb / sec) and we deleted a file of size 4 Kb
// in 1 second, we will wait for another 3 seconds before we delete other
// files, Set to 0 to disable rate limiting.
// @info_log: If not nullptr, info_log will be used to log errors.
// @delete_exisitng_trash: If set to true, the newly created DeleteScheduler
// will delete files that already exist in trash_dir.
// @status: If not nullptr, status will contain any errors that happened during
// creating the missing trash_dir or deleting existing files in trash.
extern DeleteScheduler* NewDeleteScheduler(
Env* env, const std::string& trash_dir, int64_t rate_bytes_per_sec,
std::shared_ptr<Logger> info_log = nullptr,
bool delete_exisitng_trash = true, Status* status = nullptr);
} // namespace rocksdb

View file

@ -33,6 +33,7 @@ class CompactionFilterFactory;
class Comparator;
class Env;
enum InfoLogLevel : unsigned char;
class SstFileManager;
class FilterPolicy;
class Logger;
class MergeOperator;
@ -41,7 +42,6 @@ class TableFactory;
class MemTableRepFactory;
class TablePropertiesCollectorFactory;
class RateLimiter;
class DeleteScheduler;
class Slice;
class SliceTransform;
class Statistics;
@ -830,12 +830,12 @@ struct DBOptions {
// Default: nullptr
std::shared_ptr<RateLimiter> rate_limiter;
// Use to control files deletion rate, can be used among multiple
// RocksDB instances. delete_scheduler is only used to delete table files that
// need to be deleted from the first db_path (db_name if db_paths is empty),
// other files types and other db_paths wont be affected by delete_scheduler.
// Default: nullptr (disabled)
std::shared_ptr<DeleteScheduler> delete_scheduler;
// Use to track SST files and control their file deletion rate, can be used
// among multiple RocksDB instances, sst_file_manager only track and throttle
// deletes of SST files in first db_path (db_name if db_paths is empty), other
// files and other db_paths wont be tracked or affected by sst_file_manager.
// Default: nullptr
std::shared_ptr<SstFileManager> sst_file_manager;
// Any internal progress/error information generated by the db will
// be written to info_log if it is non-nullptr, or to a file stored

View file

@ -0,0 +1,64 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include "rocksdb/status.h"
namespace rocksdb {
class Env;
class Logger;
// SstFileManager is used to track SST files in the DB and control there
// deletion rate.
// All SstFileManager public functions are thread-safe.
class SstFileManager {
public:
virtual ~SstFileManager() {}
// Return the total size of all tracked files.
// thread-safe
virtual uint64_t GetTotalSize() = 0;
// Return a map containing all tracked files and there corresponding sizes.
// thread-safe
virtual std::unordered_map<std::string, uint64_t> GetTrackedFiles() = 0;
// Return delete rate limit in bytes per second.
// thread-safe
virtual int64_t GetDeleteRateBytesPerSecond() = 0;
};
// Create a new SstFileManager that can be shared among multiple RocksDB
// instances to track SST file and control there deletion rate.
//
// @param env: Pointer to Env object, please see "rocksdb/env.h".
// @param info_log: If not nullptr, info_log will be used to log errors.
//
// == Deletion rate limiting specific arguments ==
// @param trash_dir: Path to the directory where deleted files will be moved
// to be deleted in a background thread while applying rate limiting. If this
// directory dont exist, it will be created. This directory should not be
// used by any other process or any other SstFileManager, Set to "" to
// disable deletion rate limiting.
// @param rate_bytes_per_sec: How many bytes should be deleted per second, If
// this value is set to 1024 (1 Kb / sec) and we deleted a file of size 4 Kb
// in 1 second, we will wait for another 3 seconds before we delete other
// files, Set to 0 to disable deletion rate limiting.
// @param delete_exisitng_trash: If set to true, the newly created
// SstFileManager will delete files that already exist in trash_dir.
// @param status: If not nullptr, status will contain any errors that happened
// during creating the missing trash_dir or deleting existing files in trash.
extern SstFileManager* NewSstFileManager(
Env* env, std::shared_ptr<Logger> info_log = nullptr,
std::string trash_dir = "", int64_t rate_bytes_per_sec = 0,
bool delete_exisitng_trash = true, Status* status = nullptr);
} // namespace rocksdb

3
src.mk
View file

@ -94,13 +94,14 @@ LIB_SOURCES = \
util/compaction_job_stats_impl.cc \
util/concurrent_arena.cc \
util/crc32c.cc \
util/delete_scheduler_impl.cc \
util/delete_scheduler.cc \
util/dynamic_bloom.cc \
util/env.cc \
util/env_hdfs.cc \
util/env_posix.cc \
util/io_posix.cc \
util/thread_posix.cc \
util/sst_file_manager_impl.cc \
util/file_util.cc \
util/file_reader_writer.cc \
util/filter_policy.cc \

View file

@ -3,38 +3,40 @@
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "util/delete_scheduler_impl.h"
#include "util/delete_scheduler.h"
#include <thread>
#include <vector>
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/sst_file_manager_impl.h"
#include "util/mutexlock.h"
#include "util/sync_point.h"
namespace rocksdb {
DeleteSchedulerImpl::DeleteSchedulerImpl(Env* env, const std::string& trash_dir,
int64_t rate_bytes_per_sec,
std::shared_ptr<Logger> info_log)
DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir,
int64_t rate_bytes_per_sec, Logger* info_log,
SstFileManagerImpl* sst_file_manager)
: env_(env),
trash_dir_(trash_dir),
rate_bytes_per_sec_(rate_bytes_per_sec),
pending_files_(0),
closing_(false),
cv_(&mu_),
info_log_(info_log) {
if (rate_bytes_per_sec_ == 0) {
info_log_(info_log),
sst_file_manager_(sst_file_manager) {
if (rate_bytes_per_sec_ <= 0) {
// Rate limiting is disabled
bg_thread_.reset();
} else {
bg_thread_.reset(
new std::thread(&DeleteSchedulerImpl::BackgroundEmptyTrash, this));
new std::thread(&DeleteScheduler::BackgroundEmptyTrash, this));
}
}
DeleteSchedulerImpl::~DeleteSchedulerImpl() {
DeleteScheduler::~DeleteScheduler() {
{
MutexLock l(&mu_);
closing_ = true;
@ -45,20 +47,29 @@ DeleteSchedulerImpl::~DeleteSchedulerImpl() {
}
}
Status DeleteSchedulerImpl::DeleteFile(const std::string& file_path) {
if (rate_bytes_per_sec_ == 0) {
Status DeleteScheduler::DeleteFile(const std::string& file_path) {
Status s;
if (rate_bytes_per_sec_ <= 0) {
// Rate limiting is disabled
return env_->DeleteFile(file_path);
s = env_->DeleteFile(file_path);
if (s.ok() && sst_file_manager_) {
sst_file_manager_->OnDeleteFile(file_path);
}
return s;
}
// Move file to trash
std::string path_in_trash;
Status s = MoveToTrash(file_path, &path_in_trash);
s = MoveToTrash(file_path, &path_in_trash);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
"Failed to move %s to trash directory (%s)", file_path.c_str(),
trash_dir_.c_str());
return env_->DeleteFile(file_path);
s = env_->DeleteFile(file_path);
if (s.ok() && sst_file_manager_) {
sst_file_manager_->OnDeleteFile(file_path);
}
return s;
}
// Add file to delete queue
@ -73,13 +84,13 @@ Status DeleteSchedulerImpl::DeleteFile(const std::string& file_path) {
return s;
}
std::map<std::string, Status> DeleteSchedulerImpl::GetBackgroundErrors() {
std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() {
MutexLock l(&mu_);
return bg_errors_;
}
Status DeleteSchedulerImpl::MoveToTrash(const std::string& file_path,
std::string* path_in_trash) {
Status DeleteScheduler::MoveToTrash(const std::string& file_path,
std::string* path_in_trash) {
Status s;
// Figure out the name of the file in trash folder
size_t idx = file_path.rfind("/");
@ -112,11 +123,14 @@ Status DeleteSchedulerImpl::MoveToTrash(const std::string& file_path,
break;
}
}
if (s.ok() && sst_file_manager_) {
sst_file_manager_->OnMoveFile(file_path, *path_in_trash);
}
return s;
}
void DeleteSchedulerImpl::BackgroundEmptyTrash() {
TEST_SYNC_POINT("DeleteSchedulerImpl::BackgroundEmptyTrash");
void DeleteScheduler::BackgroundEmptyTrash() {
TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash");
while (true) {
MutexLock l(&mu_);
@ -151,7 +165,7 @@ void DeleteSchedulerImpl::BackgroundEmptyTrash() {
uint64_t total_penlty =
((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_);
while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
TEST_SYNC_POINT_CALLBACK("DeleteSchedulerImpl::BackgroundEmptyTrash:Wait",
TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
&total_penlty);
pending_files_--;
@ -164,12 +178,12 @@ void DeleteSchedulerImpl::BackgroundEmptyTrash() {
}
}
Status DeleteSchedulerImpl::DeleteTrashFile(const std::string& path_in_trash,
uint64_t* deleted_bytes) {
Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
uint64_t* deleted_bytes) {
uint64_t file_size;
Status s = env_->GetFileSize(path_in_trash, &file_size);
if (s.ok()) {
TEST_SYNC_POINT("DeleteSchedulerImpl::DeleteTrashFile:DeleteFile");
TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
s = env_->DeleteFile(path_in_trash);
}
@ -181,51 +195,19 @@ Status DeleteSchedulerImpl::DeleteTrashFile(const std::string& path_in_trash,
*deleted_bytes = 0;
} else {
*deleted_bytes = file_size;
if (sst_file_manager_) {
sst_file_manager_->OnDeleteFile(path_in_trash);
}
}
return s;
}
void DeleteSchedulerImpl::WaitForEmptyTrash() {
void DeleteScheduler::WaitForEmptyTrash() {
MutexLock l(&mu_);
while (pending_files_ > 0 && !closing_) {
cv_.Wait();
}
}
DeleteScheduler* NewDeleteScheduler(Env* env, const std::string& trash_dir,
int64_t rate_bytes_per_sec,
std::shared_ptr<Logger> info_log,
bool delete_exisitng_trash,
Status* status) {
DeleteScheduler* res =
new DeleteSchedulerImpl(env, trash_dir, rate_bytes_per_sec, info_log);
Status s;
if (trash_dir != "") {
s = env->CreateDirIfMissing(trash_dir);
if (s.ok() && delete_exisitng_trash) {
std::vector<std::string> files_in_trash;
s = env->GetChildren(trash_dir, &files_in_trash);
if (s.ok()) {
for (const std::string& trash_file : files_in_trash) {
if (trash_file == "." || trash_file == "..") {
continue;
}
Status file_delete = res->DeleteFile(trash_dir + "/" + trash_file);
if (s.ok() && !file_delete.ok()) {
s = file_delete;
}
}
}
}
}
if (status) {
*status = s;
}
return res;
}
} // namespace rocksdb

View file

@ -12,21 +12,28 @@
#include "port/port.h"
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/status.h"
namespace rocksdb {
class Env;
class Logger;
class SstFileManagerImpl;
class DeleteSchedulerImpl : public DeleteScheduler {
// DeleteScheduler allows the DB to enforce a rate limit on file deletion,
// Instead of deleteing files immediately, files are moved to trash_dir
// and deleted in a background thread that apply sleep penlty between deletes
// if they are happening in a rate faster than rate_bytes_per_sec,
//
// Rate limiting can be turned off by setting rate_bytes_per_sec = 0, In this
// case DeleteScheduler will delete files immediately.
class DeleteScheduler {
public:
DeleteSchedulerImpl(Env* env, const std::string& trash_dir,
int64_t rate_bytes_per_sec,
std::shared_ptr<Logger> info_log);
DeleteScheduler(Env* env, const std::string& trash_dir,
int64_t rate_bytes_per_sec, Logger* info_log,
SstFileManagerImpl* sst_file_manager);
~DeleteSchedulerImpl();
~DeleteScheduler();
// Return delete rate limit in bytes per second
int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_; }
@ -63,7 +70,7 @@ class DeleteSchedulerImpl : public DeleteScheduler {
int32_t pending_files_;
// Errors that happened in BackgroundEmptyTrash (file_path => error)
std::map<std::string, Status> bg_errors_;
// Set to true in ~DeleteSchedulerImpl() to force BackgroundEmptyTrash to stop
// Set to true in ~DeleteScheduler() to force BackgroundEmptyTrash to stop
bool closing_;
// Condition variable signaled in these conditions
// - pending_files_ value change from 0 => 1
@ -74,7 +81,8 @@ class DeleteSchedulerImpl : public DeleteScheduler {
std::unique_ptr<std::thread> bg_thread_;
// Mutex to protect threads from file name conflicts
port::Mutex file_move_mu_;
std::shared_ptr<Logger> info_log_;
Logger* info_log_;
SstFileManagerImpl* sst_file_manager_;
static const uint64_t kMicrosInSecond = 1000 * 1000LL;
};

View file

@ -12,9 +12,9 @@
#include <thread>
#include <vector>
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "util/delete_scheduler.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
@ -74,6 +74,12 @@ class DeleteSchedulerTest : public testing::Test {
return file_path;
}
void NewDeleteScheduler() {
ASSERT_OK(env_->CreateDirIfMissing(trash_dir_));
delete_scheduler_.reset(new DeleteScheduler(
env_, trash_dir_, rate_bytes_per_sec_, nullptr, nullptr));
}
Env* env_;
std::string dummy_files_dir_;
std::string trash_dir_;
@ -84,19 +90,19 @@ class DeleteSchedulerTest : public testing::Test {
// Test the basic functionality of DeleteScheduler (Rate Limiting).
// 1- Create 100 dummy files
// 2- Delete the 100 dummy files using DeleteScheduler
// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash ---
// --- Hold DeleteScheduler::BackgroundEmptyTrash ---
// 3- Wait for DeleteScheduler to delete all files in trash
// 4- Verify that BackgroundEmptyTrash used to correct penlties for the files
// 5- Make sure that all created files were completely deleted
TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DeleteSchedulerTest::BasicRateLimiting:1",
"DeleteSchedulerImpl::BackgroundEmptyTrash"},
"DeleteScheduler::BackgroundEmptyTrash"},
});
std::vector<uint64_t> penalties;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::BackgroundEmptyTrash:Wait",
"DeleteScheduler::BackgroundEmptyTrash:Wait",
[&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
int num_files = 100; // 100 files
@ -110,8 +116,7 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
DestroyAndCreateDir(dummy_files_dir_);
rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
delete_scheduler_.reset(
NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_));
NewDeleteScheduler();
// Create 100 dummy files, every file is 1 Kb
std::vector<std::string> generated_files;
@ -152,19 +157,19 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
// Same as the BasicRateLimiting test but delete files in multiple threads.
// 1- Create 100 dummy files
// 2- Delete the 100 dummy files using DeleteScheduler using 10 threads
// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash ---
// --- Hold DeleteScheduler::BackgroundEmptyTrash ---
// 3- Wait for DeleteScheduler to delete all files in queue
// 4- Verify that BackgroundEmptyTrash used to correct penlties for the files
// 5- Make sure that all created files were completely deleted
TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DeleteSchedulerTest::RateLimitingMultiThreaded:1",
"DeleteSchedulerImpl::BackgroundEmptyTrash"},
"DeleteScheduler::BackgroundEmptyTrash"},
});
std::vector<uint64_t> penalties;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::BackgroundEmptyTrash:Wait",
"DeleteScheduler::BackgroundEmptyTrash:Wait",
[&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
int thread_cnt = 10;
@ -179,8 +184,7 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
DestroyAndCreateDir(dummy_files_dir_);
rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
delete_scheduler_.reset(
NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_));
NewDeleteScheduler();
// Create 100 dummy files, every file is 1 Kb
std::vector<std::string> generated_files;
@ -239,12 +243,13 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
TEST_F(DeleteSchedulerTest, DisableRateLimiting) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
delete_scheduler_.reset(NewDeleteScheduler(env_, "", 0));
rate_bytes_per_sec_ = 0;
NewDeleteScheduler();
for (int i = 0; i < 10; i++) {
// Every file we delete will be deleted immediately
@ -264,18 +269,17 @@ TEST_F(DeleteSchedulerTest, DisableRateLimiting) {
// 1- Create 10 files with the same name "conflict.data"
// 2- Delete the 10 files using DeleteScheduler
// 3- Make sure that trash directory contain 10 files ("conflict.data" x 10)
// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash ---
// --- Hold DeleteScheduler::BackgroundEmptyTrash ---
// 4- Make sure that files are deleted from trash
TEST_F(DeleteSchedulerTest, ConflictNames) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DeleteSchedulerTest::ConflictNames:1",
"DeleteSchedulerImpl::BackgroundEmptyTrash"},
"DeleteScheduler::BackgroundEmptyTrash"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec
delete_scheduler_.reset(
NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_));
NewDeleteScheduler();
// Create "conflict.data" and move it to trash 10 times
for (int i = 0; i < 10; i++) {
@ -300,19 +304,18 @@ TEST_F(DeleteSchedulerTest, ConflictNames) {
// 1- Create 10 dummy files
// 2- Delete the 10 files using DeleteScheduler (move them to trsah)
// 3- Delete the 10 files directly (using env_->DeleteFile)
// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash ---
// --- Hold DeleteScheduler::BackgroundEmptyTrash ---
// 4- Make sure that DeleteScheduler failed to delete the 10 files and
// reported 10 background errors
TEST_F(DeleteSchedulerTest, BackgroundError) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DeleteSchedulerTest::BackgroundError:1",
"DeleteSchedulerImpl::BackgroundEmptyTrash"},
"DeleteScheduler::BackgroundEmptyTrash"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec
delete_scheduler_.reset(
NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_));
NewDeleteScheduler();
// Generate 10 dummy files and move them to trash
for (int i = 0; i < 10; i++) {
@ -339,32 +342,6 @@ TEST_F(DeleteSchedulerTest, BackgroundError) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// 1- Create 10 files in trash
// 2- Create a DeleteScheduler with delete_exisitng_trash = true
// 3- Wait for DeleteScheduler to delete all files in queue
// 4- Make sure that all files in trash directory were deleted
TEST_F(DeleteSchedulerTest, TrashWithExistingFiles) {
std::vector<std::string> dummy_files;
for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
std::string trash_path = trash_dir_ + "/" + file_name;
env_->RenameFile(NewDummyFile(file_name), trash_path);
}
ASSERT_EQ(CountFilesInDir(trash_dir_), 10);
Status s;
rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec
delete_scheduler_.reset(NewDeleteScheduler(
env_, trash_dir_, rate_bytes_per_sec_, nullptr, true, &s));
ASSERT_OK(s);
delete_scheduler_->WaitForEmptyTrash();
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
ASSERT_EQ(bg_errors.size(), 0);
}
// 1- Create 10 dummy files
// 2- Delete 10 dummy files using DeleteScheduler
// 3- Wait for DeleteScheduler to delete all files in queue
@ -373,13 +350,12 @@ TEST_F(DeleteSchedulerTest, TrashWithExistingFiles) {
TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / sec
delete_scheduler_.reset(
NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_));
NewDeleteScheduler();
// Move files to trash, wait for empty trash, start again
for (int run = 1; run <= 5; run++) {
@ -409,13 +385,12 @@ TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) {
TEST_F(DeleteSchedulerTest, DestructorWithNonEmptyQueue) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1; // 1 Byte / sec
delete_scheduler_.reset(
NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_));
NewDeleteScheduler();
for (int i = 0; i < 100; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
@ -439,13 +414,12 @@ TEST_F(DeleteSchedulerTest, DestructorWithNonEmptyQueue) {
TEST_F(DeleteSchedulerTest, MoveToTrashError) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
"DeleteScheduler::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1024; // 1 Kb / sec
delete_scheduler_.reset(
NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_));
NewDeleteScheduler();
// We will delete the trash directory, that mean that DeleteScheduler wont
// be able to move files to trash and will delete files them immediately.
@ -460,7 +434,6 @@ TEST_F(DeleteSchedulerTest, MoveToTrashError) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace rocksdb
int main(int argc, char** argv) {

View file

@ -8,9 +8,9 @@
#include <string>
#include <algorithm>
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "util/sst_file_manager_impl.h"
#include "util/file_reader_writer.h"
namespace rocksdb {
@ -66,12 +66,15 @@ Status CopyFile(Env* env, const std::string& source,
return Status::OK();
}
Status DeleteOrMoveToTrash(const DBOptions* db_options,
const std::string& fname) {
if (db_options->delete_scheduler == nullptr) {
return db_options->env->DeleteFile(fname);
Status DeleteSSTFile(const DBOptions* db_options, const std::string& fname,
uint32_t path_id) {
// TODO(tec): support sst_file_manager for multiple path_ids
auto sfm =
static_cast<SstFileManagerImpl*>(db_options->sst_file_manager.get());
if (sfm && path_id == 0) {
return sfm->ScheduleFileDeletion(fname);
} else {
return db_options->delete_scheduler->DeleteFile(fname);
return db_options->env->DeleteFile(fname);
}
}

View file

@ -16,7 +16,7 @@ namespace rocksdb {
extern Status CopyFile(Env* env, const std::string& source,
const std::string& destination, uint64_t size = 0);
extern Status DeleteOrMoveToTrash(const DBOptions* db_options,
const std::string& fname);
extern Status DeleteSSTFile(const DBOptions* db_options,
const std::string& fname, uint32_t path_id);
} // namespace rocksdb

View file

@ -20,8 +20,8 @@
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/comparator.h"
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/slice.h"
@ -213,7 +213,7 @@ DBOptions::DBOptions()
paranoid_checks(true),
env(Env::Default()),
rate_limiter(nullptr),
delete_scheduler(nullptr),
sst_file_manager(nullptr),
info_log(nullptr),
#ifdef NDEBUG
info_log_level(INFO_LEVEL),
@ -281,7 +281,7 @@ DBOptions::DBOptions(const Options& options)
paranoid_checks(options.paranoid_checks),
env(options.env),
rate_limiter(options.rate_limiter),
delete_scheduler(options.delete_scheduler),
sst_file_manager(options.sst_file_manager),
info_log(options.info_log),
info_log_level(options.info_log_level),
max_open_files(options.max_open_files),
@ -433,8 +433,9 @@ void DBOptions::Dump(Logger* log) const {
use_adaptive_mutex);
Header(log, " Options.rate_limiter: %p",
rate_limiter.get());
Header(log, " Options.delete_scheduler.rate_bytes_per_sec: %" PRIi64,
delete_scheduler ? delete_scheduler->GetRateBytesPerSecond() : 0);
Header(
log, " Options.sst_file_manager.rate_bytes_per_sec: %" PRIi64,
sst_file_manager ? sst_file_manager->GetDeleteRateBytesPerSecond() : 0);
Header(log, " Options.bytes_per_sync: %" PRIu64,
bytes_per_sync);
Header(log, " Options.wal_bytes_per_sync: %" PRIu64,

View file

@ -1609,8 +1609,8 @@ TEST_F(OptionsParserTest, DBOptionsAllFieldsSettable) {
{offsetof(struct DBOptions, env), sizeof(Env*)},
{offsetof(struct DBOptions, rate_limiter),
sizeof(std::shared_ptr<RateLimiter>)},
{offsetof(struct DBOptions, delete_scheduler),
sizeof(std::shared_ptr<DeleteScheduler>)},
{offsetof(struct DBOptions, sst_file_manager),
sizeof(std::shared_ptr<SstFileManager>)},
{offsetof(struct DBOptions, info_log), sizeof(std::shared_ptr<Logger>)},
{offsetof(struct DBOptions, statistics),
sizeof(std::shared_ptr<Statistics>)},

View file

@ -0,0 +1,143 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "util/sst_file_manager_impl.h"
#include <vector>
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/mutexlock.h"
#include "util/sync_point.h"
namespace rocksdb {
SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<Logger> logger,
const std::string& trash_dir,
int64_t rate_bytes_per_sec)
: env_(env),
logger_(logger),
total_files_size_(0),
delete_scheduler_(env, trash_dir, rate_bytes_per_sec, logger.get(),
this) {}
SstFileManagerImpl::~SstFileManagerImpl() {}
Status SstFileManagerImpl::OnAddFile(const std::string& file_path) {
uint64_t file_size;
Status s = env_->GetFileSize(file_path, &file_size);
if (s.ok()) {
MutexLock l(&mu_);
OnAddFileImpl(file_path, file_size);
}
TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
return s;
}
Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) {
{
MutexLock l(&mu_);
OnDeleteFileImpl(file_path);
}
TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile");
return Status::OK();
}
Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
const std::string& new_path) {
{
MutexLock l(&mu_);
OnAddFileImpl(new_path, tracked_files_[old_path]);
OnDeleteFileImpl(old_path);
}
TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
return Status::OK();
}
uint64_t SstFileManagerImpl::GetTotalSize() {
MutexLock l(&mu_);
return total_files_size_;
}
std::unordered_map<std::string, uint64_t>
SstFileManagerImpl::GetTrackedFiles() {
MutexLock l(&mu_);
return tracked_files_;
}
int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() {
return delete_scheduler_.GetRateBytesPerSecond();
}
Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path) {
return delete_scheduler_.DeleteFile(file_path);
}
void SstFileManagerImpl::WaitForEmptyTrash() {
delete_scheduler_.WaitForEmptyTrash();
}
void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
uint64_t file_size) {
auto tracked_file = tracked_files_.find(file_path);
if (tracked_file != tracked_files_.end()) {
// File was added before, we will just update the size
total_files_size_ -= tracked_file->second;
total_files_size_ += file_size;
} else {
total_files_size_ += file_size;
}
tracked_files_[file_path] = file_size;
}
void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) {
auto tracked_file = tracked_files_.find(file_path);
if (tracked_file == tracked_files_.end()) {
// File is not tracked
return;
}
total_files_size_ -= tracked_file->second;
tracked_files_.erase(tracked_file);
}
SstFileManager* NewSstFileManager(Env* env, std::shared_ptr<Logger> info_log,
std::string trash_dir,
int64_t rate_bytes_per_sec,
bool delete_exisitng_trash, Status* status) {
SstFileManagerImpl* res =
new SstFileManagerImpl(env, info_log, trash_dir, rate_bytes_per_sec);
Status s;
if (trash_dir != "" && rate_bytes_per_sec > 0) {
s = env->CreateDirIfMissing(trash_dir);
if (s.ok() && delete_exisitng_trash) {
std::vector<std::string> files_in_trash;
s = env->GetChildren(trash_dir, &files_in_trash);
if (s.ok()) {
for (const std::string& trash_file : files_in_trash) {
if (trash_file == "." || trash_file == "..") {
continue;
}
std::string path_in_trash = trash_dir + "/" + trash_file;
res->OnAddFile(path_in_trash);
Status file_delete = res->ScheduleFileDeletion(path_in_trash);
if (s.ok() && !file_delete.ok()) {
s = file_delete;
}
}
}
}
}
if (status) {
*status = s;
}
return res;
}
} // namespace rocksdb

View file

@ -0,0 +1,77 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include "port/port.h"
#include "rocksdb/sst_file_manager.h"
#include "util/delete_scheduler.h"
namespace rocksdb {
class Env;
class Logger;
// SstFileManager is used to track SST files in the DB and control there
// deletion rate.
// All SstFileManager public functions are thread-safe.
class SstFileManagerImpl : public SstFileManager {
public:
explicit SstFileManagerImpl(Env* env, std::shared_ptr<Logger> logger,
const std::string& trash_dir,
int64_t rate_bytes_per_sec);
~SstFileManagerImpl();
// DB will call OnAddFile whenever a new sst file is added.
Status OnAddFile(const std::string& file_path);
// DB will call OnDeleteFile whenever an sst file is deleted.
Status OnDeleteFile(const std::string& file_path);
// DB will call OnMoveFile whenever an sst file is move to a new path.
Status OnMoveFile(const std::string& old_path, const std::string& new_path);
// Return the total size of all tracked files.
uint64_t GetTotalSize() override;
// Return a map containing all tracked files and there corresponding sizes.
std::unordered_map<std::string, uint64_t> GetTrackedFiles() override;
// Return delete rate limit in bytes per second.
virtual int64_t GetDeleteRateBytesPerSecond() override;
// Move file to trash directory and schedule it's deletion.
virtual Status ScheduleFileDeletion(const std::string& file_path);
// Wait for all files being deleteing in the background to finish or for
// destructor to be called.
virtual void WaitForEmptyTrash();
private:
// REQUIRES: mutex locked
void OnAddFileImpl(const std::string& file_path, uint64_t file_size);
// REQUIRES: mutex locked
void OnDeleteFileImpl(const std::string& file_path);
Env* env_;
std::shared_ptr<Logger> logger_;
// Mutex to protect tracked_files_, total_files_size_
port::Mutex mu_;
// The summation of the sizes of all files in tracked_files_ map
uint64_t total_files_size_;
// A map containing all tracked files and there sizes
// file_path => file_size
std::unordered_map<std::string, uint64_t> tracked_files_;
// DeleteScheduler used to throttle file deletition, if SstFileManagerImpl was
// created with rate_bytes_per_sec == 0 or trash_dir == "", delete_scheduler_
// rate limiting will be disabled and will simply delete the files.
DeleteScheduler delete_scheduler_;
};
} // namespace rocksdb