mirror of
https://github.com/facebook/rocksdb.git
synced 2024-11-30 04:41:49 +00:00
586d78b31e
Summary: Context: In pull request https://github.com/facebook/rocksdb/issues/11436, we are introducing a new public API `waitForCompact(const WaitForCompactOptions& wait_for_compact_options)`. This API invokes the internal implementation `waitForCompact(bool wait_unscheduled=false)`. The unscheduled parameter indicates the compactions that are not yet scheduled but are required to process items in the queue. In certain cases, we are unable to wait for compactions, such as during a shutdown or when background jobs are paused. It is important to return the appropriate status in these scenarios. For all other cases, we should wait for all compaction and flush jobs, including the unscheduled ones. The primary purpose of this new API is to wait until the system has resolved its compaction debt. Currently, the usage of `wait_unscheduled` is limited to test code. This pull request eliminates the usage of wait_unscheduled. The internal `waitForCompact()` API now waits for unscheduled compactions unless the db is undergoing a shutdown. In the event of a shutdown, the API returns `Status::ShutdownInProgress()`. Additionally, a new parameter, `abort_on_pause`, has been introduced with a default value of `false`. This parameter addresses the possibility of waiting indefinitely for unscheduled jobs if `PauseBackgroundWork()` was called before `waitForCompact()` is invoked. By setting `abort_on_pause` to `true`, the API will immediately return `Status::Aborted`. Furthermore, all tests that previously called `waitForCompact(true)` have been fixed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11443 Test Plan: Existing tests that involve a shutdown in progress: - DBCompactionTest::CompactRangeShutdownWhileDelayed - DBTestWithParam::PreShutdownMultipleCompaction - DBTestWithParam::PreShutdownCompactionMiddle Reviewed By: pdillinger Differential Revision: D45923426 Pulled By: jaykorean fbshipit-source-id: 7dc93fe6a6841a7d9d2d72866fa647090dba8eae
955 lines
31 KiB
C++
955 lines
31 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
|
|
#include "db/db_test_util.h"
|
|
#include "port/stack_trace.h"
|
|
#include "table/unique_id_impl.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class MyTestCompactionService : public CompactionService {
|
|
public:
|
|
MyTestCompactionService(
|
|
std::string db_path, Options& options,
|
|
std::shared_ptr<Statistics>& statistics,
|
|
std::vector<std::shared_ptr<EventListener>>& listeners,
|
|
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
|
table_properties_collector_factories)
|
|
: db_path_(std::move(db_path)),
|
|
options_(options),
|
|
statistics_(statistics),
|
|
start_info_("na", "na", "na", 0, Env::TOTAL),
|
|
wait_info_("na", "na", "na", 0, Env::TOTAL),
|
|
listeners_(listeners),
|
|
table_properties_collector_factories_(
|
|
std::move(table_properties_collector_factories)) {}
|
|
|
|
static const char* kClassName() { return "MyTestCompactionService"; }
|
|
|
|
const char* Name() const override { return kClassName(); }
|
|
|
|
CompactionServiceJobStatus StartV2(
|
|
const CompactionServiceJobInfo& info,
|
|
const std::string& compaction_service_input) override {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
start_info_ = info;
|
|
assert(info.db_name == db_path_);
|
|
jobs_.emplace(info.job_id, compaction_service_input);
|
|
CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess;
|
|
if (is_override_start_status_) {
|
|
return override_start_status_;
|
|
}
|
|
return s;
|
|
}
|
|
|
|
CompactionServiceJobStatus WaitForCompleteV2(
|
|
const CompactionServiceJobInfo& info,
|
|
std::string* compaction_service_result) override {
|
|
std::string compaction_input;
|
|
assert(info.db_name == db_path_);
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
wait_info_ = info;
|
|
auto i = jobs_.find(info.job_id);
|
|
if (i == jobs_.end()) {
|
|
return CompactionServiceJobStatus::kFailure;
|
|
}
|
|
compaction_input = std::move(i->second);
|
|
jobs_.erase(i);
|
|
}
|
|
|
|
if (is_override_wait_status_) {
|
|
return override_wait_status_;
|
|
}
|
|
|
|
CompactionServiceOptionsOverride options_override;
|
|
options_override.env = options_.env;
|
|
options_override.file_checksum_gen_factory =
|
|
options_.file_checksum_gen_factory;
|
|
options_override.comparator = options_.comparator;
|
|
options_override.merge_operator = options_.merge_operator;
|
|
options_override.compaction_filter = options_.compaction_filter;
|
|
options_override.compaction_filter_factory =
|
|
options_.compaction_filter_factory;
|
|
options_override.prefix_extractor = options_.prefix_extractor;
|
|
options_override.table_factory = options_.table_factory;
|
|
options_override.sst_partitioner_factory = options_.sst_partitioner_factory;
|
|
options_override.statistics = statistics_;
|
|
if (!listeners_.empty()) {
|
|
options_override.listeners = listeners_;
|
|
}
|
|
|
|
if (!table_properties_collector_factories_.empty()) {
|
|
options_override.table_properties_collector_factories =
|
|
table_properties_collector_factories_;
|
|
}
|
|
|
|
OpenAndCompactOptions options;
|
|
options.canceled = &canceled_;
|
|
|
|
Status s = DB::OpenAndCompact(
|
|
options, db_path_, db_path_ + "/" + std::to_string(info.job_id),
|
|
compaction_input, compaction_service_result, options_override);
|
|
if (is_override_wait_result_) {
|
|
*compaction_service_result = override_wait_result_;
|
|
}
|
|
compaction_num_.fetch_add(1);
|
|
if (s.ok()) {
|
|
return CompactionServiceJobStatus::kSuccess;
|
|
} else {
|
|
return CompactionServiceJobStatus::kFailure;
|
|
}
|
|
}
|
|
|
|
int GetCompactionNum() { return compaction_num_.load(); }
|
|
|
|
CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; }
|
|
CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; }
|
|
|
|
void OverrideStartStatus(CompactionServiceJobStatus s) {
|
|
is_override_start_status_ = true;
|
|
override_start_status_ = s;
|
|
}
|
|
|
|
void OverrideWaitStatus(CompactionServiceJobStatus s) {
|
|
is_override_wait_status_ = true;
|
|
override_wait_status_ = s;
|
|
}
|
|
|
|
void OverrideWaitResult(std::string str) {
|
|
is_override_wait_result_ = true;
|
|
override_wait_result_ = std::move(str);
|
|
}
|
|
|
|
void ResetOverride() {
|
|
is_override_wait_result_ = false;
|
|
is_override_start_status_ = false;
|
|
is_override_wait_status_ = false;
|
|
}
|
|
|
|
void SetCanceled(bool canceled) { canceled_ = canceled; }
|
|
|
|
private:
|
|
InstrumentedMutex mutex_;
|
|
std::atomic_int compaction_num_{0};
|
|
std::map<uint64_t, std::string> jobs_;
|
|
const std::string db_path_;
|
|
Options options_;
|
|
std::shared_ptr<Statistics> statistics_;
|
|
CompactionServiceJobInfo start_info_;
|
|
CompactionServiceJobInfo wait_info_;
|
|
bool is_override_start_status_ = false;
|
|
CompactionServiceJobStatus override_start_status_ =
|
|
CompactionServiceJobStatus::kFailure;
|
|
bool is_override_wait_status_ = false;
|
|
CompactionServiceJobStatus override_wait_status_ =
|
|
CompactionServiceJobStatus::kFailure;
|
|
bool is_override_wait_result_ = false;
|
|
std::string override_wait_result_;
|
|
std::vector<std::shared_ptr<EventListener>> listeners_;
|
|
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
|
table_properties_collector_factories_;
|
|
std::atomic_bool canceled_{false};
|
|
};
|
|
|
|
class CompactionServiceTest : public DBTestBase {
|
|
public:
|
|
explicit CompactionServiceTest()
|
|
: DBTestBase("compaction_service_test", true) {}
|
|
|
|
protected:
|
|
void ReopenWithCompactionService(Options* options) {
|
|
options->env = env_;
|
|
primary_statistics_ = CreateDBStatistics();
|
|
options->statistics = primary_statistics_;
|
|
compactor_statistics_ = CreateDBStatistics();
|
|
|
|
compaction_service_ = std::make_shared<MyTestCompactionService>(
|
|
dbname_, *options, compactor_statistics_, remote_listeners,
|
|
remote_table_properties_collector_factories);
|
|
options->compaction_service = compaction_service_;
|
|
DestroyAndReopen(*options);
|
|
}
|
|
|
|
Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); }
|
|
|
|
Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); }
|
|
|
|
MyTestCompactionService* GetCompactionService() {
|
|
CompactionService* cs = compaction_service_.get();
|
|
return static_cast_with_check<MyTestCompactionService>(cs);
|
|
}
|
|
|
|
void GenerateTestData() {
|
|
// Generate 20 files @ L2
|
|
for (int i = 0; i < 20; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 10 + j;
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
MoveFilesToLevel(2);
|
|
|
|
// Generate 10 files @ L1 overlap with all 20 files @ L2
|
|
for (int i = 0; i < 10; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 20 + j * 2;
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
MoveFilesToLevel(1);
|
|
ASSERT_EQ(FilesPerLevel(), "0,10,20");
|
|
}
|
|
|
|
void VerifyTestData() {
|
|
for (int i = 0; i < 200; i++) {
|
|
auto result = Get(Key(i));
|
|
if (i % 2) {
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
} else {
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
}
|
|
}
|
|
}
|
|
|
|
std::vector<std::shared_ptr<EventListener>> remote_listeners;
|
|
std::vector<std::shared_ptr<TablePropertiesCollectorFactory>>
|
|
remote_table_properties_collector_factories;
|
|
|
|
private:
|
|
std::shared_ptr<Statistics> compactor_statistics_;
|
|
std::shared_ptr<Statistics> primary_statistics_;
|
|
std::shared_ptr<CompactionService> compaction_service_;
|
|
};
|
|
|
|
TEST_F(CompactionServiceTest, BasicCompactions) {
|
|
Options options = CurrentOptions();
|
|
ReopenWithCompactionService(&options);
|
|
|
|
Statistics* primary_statistics = GetPrimaryStatistics();
|
|
Statistics* compactor_statistics = GetCompactorStatistics();
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 10 + j;
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 20 + j * 2;
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
// verify result
|
|
for (int i = 0; i < 200; i++) {
|
|
auto result = Get(Key(i));
|
|
if (i % 2) {
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
} else {
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
}
|
|
}
|
|
auto my_cs = GetCompactionService();
|
|
ASSERT_GE(my_cs->GetCompactionNum(), 1);
|
|
|
|
// make sure the compaction statistics is only recorded on the remote side
|
|
ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1);
|
|
ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
|
|
ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0);
|
|
// even with remote compaction, primary host still needs to read SST files to
|
|
// `verify_table()`.
|
|
ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1);
|
|
// all the compaction write happens on the remote side
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
|
|
ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1);
|
|
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES),
|
|
primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES));
|
|
// compactor is already the remote side, which doesn't have remote
|
|
ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
|
|
ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
|
|
0);
|
|
|
|
// Test failed compaction
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) {
|
|
// override job status
|
|
auto s = static_cast<Status*>(status);
|
|
*s = Status::Aborted("MyTestCompactionService failed to compact!");
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
Status s;
|
|
for (int i = 0; i < 10; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 20 + j * 2;
|
|
s = Put(Key(key_id), "value_new" + std::to_string(key_id));
|
|
if (s.IsAborted()) {
|
|
break;
|
|
}
|
|
}
|
|
if (s.IsAborted()) {
|
|
break;
|
|
}
|
|
s = Flush();
|
|
if (s.IsAborted()) {
|
|
break;
|
|
}
|
|
s = dbfull()->TEST_WaitForCompact();
|
|
if (s.IsAborted()) {
|
|
break;
|
|
}
|
|
}
|
|
ASSERT_TRUE(s.IsAborted());
|
|
|
|
// Test re-open and successful unique id verification
|
|
std::atomic_int verify_passed{0};
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"BlockBasedTable::Open::PassedVerifyUniqueId", [&](void* arg) {
|
|
// override job status
|
|
auto id = static_cast<UniqueId64x2*>(arg);
|
|
assert(*id != kNullUniqueId64x2);
|
|
verify_passed++;
|
|
});
|
|
Reopen(options);
|
|
ASSERT_GT(verify_passed, 0);
|
|
Close();
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, ManualCompaction) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
ReopenWithCompactionService(&options);
|
|
GenerateTestData();
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
std::string start_str = Key(15);
|
|
std::string end_str = Key(45);
|
|
Slice start(start_str);
|
|
Slice end(end_str);
|
|
uint64_t comp_num = my_cs->GetCompactionNum();
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
VerifyTestData();
|
|
|
|
start_str = Key(120);
|
|
start = start_str;
|
|
comp_num = my_cs->GetCompactionNum();
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
VerifyTestData();
|
|
|
|
end_str = Key(92);
|
|
end = end_str;
|
|
comp_num = my_cs->GetCompactionNum();
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end));
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
VerifyTestData();
|
|
|
|
comp_num = my_cs->GetCompactionNum();
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
VerifyTestData();
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
ReopenWithCompactionService(&options);
|
|
GenerateTestData();
|
|
|
|
auto my_cs = GetCompactionService();
|
|
|
|
std::string start_str = Key(15);
|
|
std::string end_str = Key(45);
|
|
Slice start(start_str);
|
|
Slice end(end_str);
|
|
uint64_t comp_num = my_cs->GetCompactionNum();
|
|
|
|
// Test cancel compaction at the beginning
|
|
my_cs->SetCanceled(true);
|
|
auto s = db_->CompactRange(CompactRangeOptions(), &start, &end);
|
|
ASSERT_TRUE(s.IsIncomplete());
|
|
// compaction number is not increased
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
|
|
VerifyTestData();
|
|
|
|
// Test cancel compaction in progress
|
|
ReopenWithCompactionService(&options);
|
|
GenerateTestData();
|
|
my_cs = GetCompactionService();
|
|
my_cs->SetCanceled(false);
|
|
|
|
std::atomic_bool cancel_issued{false};
|
|
SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress",
|
|
[&](void* /*arg*/) {
|
|
cancel_issued = true;
|
|
my_cs->SetCanceled(true);
|
|
});
|
|
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
s = db_->CompactRange(CompactRangeOptions(), &start, &end);
|
|
ASSERT_TRUE(s.IsIncomplete());
|
|
ASSERT_TRUE(cancel_issued);
|
|
// compaction number is not increased
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num);
|
|
VerifyTestData();
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, FailedToStart) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
ReopenWithCompactionService(&options);
|
|
|
|
GenerateTestData();
|
|
|
|
auto my_cs = GetCompactionService();
|
|
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure);
|
|
|
|
std::string start_str = Key(15);
|
|
std::string end_str = Key(45);
|
|
Slice start(start_str);
|
|
Slice end(end_str);
|
|
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
|
|
ASSERT_TRUE(s.IsIncomplete());
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, InvalidResult) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
ReopenWithCompactionService(&options);
|
|
|
|
GenerateTestData();
|
|
|
|
auto my_cs = GetCompactionService();
|
|
my_cs->OverrideWaitResult("Invalid Str");
|
|
|
|
std::string start_str = Key(15);
|
|
std::string end_str = Key(45);
|
|
Slice start(start_str);
|
|
Slice end(end_str);
|
|
Status s = db_->CompactRange(CompactRangeOptions(), &start, &end);
|
|
ASSERT_FALSE(s.ok());
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, SubCompaction) {
|
|
Options options = CurrentOptions();
|
|
options.max_subcompactions = 10;
|
|
options.target_file_size_base = 1 << 10; // 1KB
|
|
options.disable_auto_compactions = true;
|
|
ReopenWithCompactionService(&options);
|
|
|
|
GenerateTestData();
|
|
VerifyTestData();
|
|
|
|
auto my_cs = GetCompactionService();
|
|
int compaction_num_before = my_cs->GetCompactionNum();
|
|
|
|
auto cro = CompactRangeOptions();
|
|
cro.max_subcompactions = 10;
|
|
Status s = db_->CompactRange(cro, nullptr, nullptr);
|
|
ASSERT_OK(s);
|
|
VerifyTestData();
|
|
int compaction_num = my_cs->GetCompactionNum() - compaction_num_before;
|
|
// make sure there's sub-compaction by checking the compaction number
|
|
ASSERT_GE(compaction_num, 2);
|
|
}
|
|
|
|
class PartialDeleteCompactionFilter : public CompactionFilter {
|
|
public:
|
|
CompactionFilter::Decision FilterV2(
|
|
int /*level*/, const Slice& key, ValueType /*value_type*/,
|
|
const Slice& /*existing_value*/, std::string* /*new_value*/,
|
|
std::string* /*skip_until*/) const override {
|
|
int i = std::stoi(key.ToString().substr(3));
|
|
if (i > 5 && i <= 105) {
|
|
return CompactionFilter::Decision::kRemove;
|
|
}
|
|
return CompactionFilter::Decision::kKeep;
|
|
}
|
|
|
|
const char* Name() const override { return "PartialDeleteCompactionFilter"; }
|
|
};
|
|
|
|
TEST_F(CompactionServiceTest, CompactionFilter) {
|
|
Options options = CurrentOptions();
|
|
std::unique_ptr<CompactionFilter> delete_comp_filter(
|
|
new PartialDeleteCompactionFilter());
|
|
options.compaction_filter = delete_comp_filter.get();
|
|
ReopenWithCompactionService(&options);
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 10 + j;
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 20 + j * 2;
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
// verify result
|
|
for (int i = 0; i < 200; i++) {
|
|
auto result = Get(Key(i));
|
|
if (i > 5 && i <= 105) {
|
|
ASSERT_EQ(result, "NOT_FOUND");
|
|
} else if (i % 2) {
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
} else {
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
}
|
|
}
|
|
auto my_cs = GetCompactionService();
|
|
ASSERT_GE(my_cs->GetCompactionNum(), 1);
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, Snapshot) {
|
|
Options options = CurrentOptions();
|
|
ReopenWithCompactionService(&options);
|
|
|
|
ASSERT_OK(Put(Key(1), "value1"));
|
|
ASSERT_OK(Put(Key(2), "value1"));
|
|
const Snapshot* s1 = db_->GetSnapshot();
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(Put(Key(1), "value2"));
|
|
ASSERT_OK(Put(Key(3), "value2"));
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
auto my_cs = GetCompactionService();
|
|
ASSERT_GE(my_cs->GetCompactionNum(), 1);
|
|
ASSERT_EQ("value1", Get(Key(1), s1));
|
|
ASSERT_EQ("value2", Get(Key(1)));
|
|
db_->ReleaseSnapshot(s1);
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, ConcurrentCompaction) {
|
|
Options options = CurrentOptions();
|
|
options.level0_file_num_compaction_trigger = 100;
|
|
options.max_background_jobs = 20;
|
|
ReopenWithCompactionService(&options);
|
|
GenerateTestData();
|
|
|
|
ColumnFamilyMetaData meta;
|
|
db_->GetColumnFamilyMetaData(&meta);
|
|
|
|
std::vector<std::thread> threads;
|
|
for (const auto& file : meta.levels[1].files) {
|
|
threads.emplace_back(std::thread([&]() {
|
|
std::string fname = file.db_path + "/" + file.name;
|
|
ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2));
|
|
}));
|
|
}
|
|
|
|
for (auto& thread : threads) {
|
|
thread.join();
|
|
}
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
// verify result
|
|
for (int i = 0; i < 200; i++) {
|
|
auto result = Get(Key(i));
|
|
if (i % 2) {
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
} else {
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
}
|
|
}
|
|
auto my_cs = GetCompactionService();
|
|
ASSERT_EQ(my_cs->GetCompactionNum(), 10);
|
|
ASSERT_EQ(FilesPerLevel(), "0,0,10");
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, CompactionInfo) {
|
|
Options options = CurrentOptions();
|
|
ReopenWithCompactionService(&options);
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 10 + j;
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 20 + j * 2;
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
auto my_cs =
|
|
static_cast_with_check<MyTestCompactionService>(GetCompactionService());
|
|
uint64_t comp_num = my_cs->GetCompactionNum();
|
|
ASSERT_GE(comp_num, 1);
|
|
|
|
CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart();
|
|
ASSERT_EQ(dbname_, info.db_name);
|
|
std::string db_id, db_session_id;
|
|
ASSERT_OK(db_->GetDbIdentity(db_id));
|
|
ASSERT_EQ(db_id, info.db_id);
|
|
ASSERT_OK(db_->GetDbSessionId(db_session_id));
|
|
ASSERT_EQ(db_session_id, info.db_session_id);
|
|
ASSERT_EQ(Env::LOW, info.priority);
|
|
info = my_cs->GetCompactionInfoForWait();
|
|
ASSERT_EQ(dbname_, info.db_name);
|
|
ASSERT_EQ(db_id, info.db_id);
|
|
ASSERT_EQ(db_session_id, info.db_session_id);
|
|
ASSERT_EQ(Env::LOW, info.priority);
|
|
|
|
// Test priority USER
|
|
ColumnFamilyMetaData meta;
|
|
db_->GetColumnFamilyMetaData(&meta);
|
|
SstFileMetaData file = meta.levels[1].files[0];
|
|
ASSERT_OK(db_->CompactFiles(CompactionOptions(),
|
|
{file.db_path + "/" + file.name}, 2));
|
|
info = my_cs->GetCompactionInfoForStart();
|
|
ASSERT_EQ(Env::USER, info.priority);
|
|
info = my_cs->GetCompactionInfoForWait();
|
|
ASSERT_EQ(Env::USER, info.priority);
|
|
|
|
// Test priority BOTTOM
|
|
env_->SetBackgroundThreads(1, Env::BOTTOM);
|
|
options.num_levels = 2;
|
|
ReopenWithCompactionService(&options);
|
|
my_cs =
|
|
static_cast_with_check<MyTestCompactionService>(GetCompactionService());
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 10 + j;
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
for (int i = 0; i < 4; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 20 + j * 2;
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
info = my_cs->GetCompactionInfoForStart();
|
|
ASSERT_EQ(Env::BOTTOM, info.priority);
|
|
info = my_cs->GetCompactionInfoForWait();
|
|
ASSERT_EQ(Env::BOTTOM, info.priority);
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, FallbackLocalAuto) {
|
|
Options options = CurrentOptions();
|
|
ReopenWithCompactionService(&options);
|
|
|
|
auto my_cs = GetCompactionService();
|
|
Statistics* compactor_statistics = GetCompactorStatistics();
|
|
Statistics* primary_statistics = GetPrimaryStatistics();
|
|
uint64_t compactor_write_bytes =
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
uint64_t primary_write_bytes =
|
|
primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
|
|
my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal);
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 10 + j;
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 20 + j * 2;
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
// verify result
|
|
for (int i = 0; i < 200; i++) {
|
|
auto result = Get(Key(i));
|
|
if (i % 2) {
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
} else {
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
}
|
|
}
|
|
|
|
ASSERT_EQ(my_cs->GetCompactionNum(), 0);
|
|
|
|
// make sure the compaction statistics is only recorded on the local side
|
|
ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
compactor_write_bytes);
|
|
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
primary_write_bytes);
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0);
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0);
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, FallbackLocalManual) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
ReopenWithCompactionService(&options);
|
|
|
|
GenerateTestData();
|
|
VerifyTestData();
|
|
|
|
auto my_cs = GetCompactionService();
|
|
Statistics* compactor_statistics = GetCompactorStatistics();
|
|
Statistics* primary_statistics = GetPrimaryStatistics();
|
|
uint64_t compactor_write_bytes =
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
uint64_t primary_write_bytes =
|
|
primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
|
|
// re-enable remote compaction
|
|
my_cs->ResetOverride();
|
|
std::string start_str = Key(15);
|
|
std::string end_str = Key(45);
|
|
Slice start(start_str);
|
|
Slice end(end_str);
|
|
uint64_t comp_num = my_cs->GetCompactionNum();
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end));
|
|
ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1);
|
|
// make sure the compaction statistics is only recorded on the remote side
|
|
ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
compactor_write_bytes);
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES));
|
|
ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
primary_write_bytes);
|
|
|
|
// return run local again with API WaitForComplete
|
|
my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal);
|
|
start_str = Key(120);
|
|
start = start_str;
|
|
comp_num = my_cs->GetCompactionNum();
|
|
compactor_write_bytes =
|
|
compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES);
|
|
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr));
|
|
ASSERT_EQ(my_cs->GetCompactionNum(),
|
|
comp_num); // no remote compaction is run
|
|
// make sure the compaction statistics is only recorded on the local side
|
|
ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
compactor_write_bytes);
|
|
ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES),
|
|
primary_write_bytes);
|
|
ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES),
|
|
compactor_write_bytes);
|
|
|
|
// verify result after 2 manual compactions
|
|
VerifyTestData();
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, RemoteEventListener) {
|
|
class RemoteEventListenerTest : public EventListener {
|
|
public:
|
|
const char* Name() const override { return "RemoteEventListenerTest"; }
|
|
|
|
void OnSubcompactionBegin(const SubcompactionJobInfo& info) override {
|
|
auto result = on_going_compactions.emplace(info.job_id);
|
|
ASSERT_TRUE(result.second); // make sure there's no duplication
|
|
compaction_num++;
|
|
EventListener::OnSubcompactionBegin(info);
|
|
}
|
|
void OnSubcompactionCompleted(const SubcompactionJobInfo& info) override {
|
|
auto num = on_going_compactions.erase(info.job_id);
|
|
ASSERT_TRUE(num == 1); // make sure the compaction id exists
|
|
EventListener::OnSubcompactionCompleted(info);
|
|
}
|
|
void OnTableFileCreated(const TableFileCreationInfo& info) override {
|
|
ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
|
|
file_created++;
|
|
EventListener::OnTableFileCreated(info);
|
|
}
|
|
void OnTableFileCreationStarted(
|
|
const TableFileCreationBriefInfo& info) override {
|
|
ASSERT_EQ(on_going_compactions.count(info.job_id), 1);
|
|
file_creation_started++;
|
|
EventListener::OnTableFileCreationStarted(info);
|
|
}
|
|
|
|
bool ShouldBeNotifiedOnFileIO() override {
|
|
file_io_notified++;
|
|
return EventListener::ShouldBeNotifiedOnFileIO();
|
|
}
|
|
|
|
std::atomic_uint64_t file_io_notified{0};
|
|
std::atomic_uint64_t file_creation_started{0};
|
|
std::atomic_uint64_t file_created{0};
|
|
|
|
std::set<int> on_going_compactions; // store the job_id
|
|
std::atomic_uint64_t compaction_num{0};
|
|
};
|
|
|
|
auto listener = new RemoteEventListenerTest();
|
|
remote_listeners.emplace_back(listener);
|
|
|
|
Options options = CurrentOptions();
|
|
ReopenWithCompactionService(&options);
|
|
|
|
for (int i = 0; i < 20; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 10 + j;
|
|
ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
int key_id = i * 20 + j * 2;
|
|
ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
// check the events are triggered
|
|
ASSERT_TRUE(listener->file_io_notified > 0);
|
|
ASSERT_TRUE(listener->file_creation_started > 0);
|
|
ASSERT_TRUE(listener->file_created > 0);
|
|
ASSERT_TRUE(listener->compaction_num > 0);
|
|
ASSERT_TRUE(listener->on_going_compactions.empty());
|
|
|
|
// verify result
|
|
for (int i = 0; i < 200; i++) {
|
|
auto result = Get(Key(i));
|
|
if (i % 2) {
|
|
ASSERT_EQ(result, "value" + std::to_string(i));
|
|
} else {
|
|
ASSERT_EQ(result, "value_new" + std::to_string(i));
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(CompactionServiceTest, TablePropertiesCollector) {
|
|
const static std::string kUserPropertyName = "TestCount";
|
|
|
|
class TablePropertiesCollectorTest : public TablePropertiesCollector {
|
|
public:
|
|
Status Finish(UserCollectedProperties* properties) override {
|
|
*properties = UserCollectedProperties{
|
|
{kUserPropertyName, std::to_string(count_)},
|
|
};
|
|
return Status::OK();
|
|
}
|
|
|
|
UserCollectedProperties GetReadableProperties() const override {
|
|
return UserCollectedProperties();
|
|
}
|
|
|
|
const char* Name() const override { return "TablePropertiesCollectorTest"; }
|
|
|
|
Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/,
|
|
EntryType /*type*/, SequenceNumber /*seq*/,
|
|
uint64_t /*file_size*/) override {
|
|
count_++;
|
|
return Status::OK();
|
|
}
|
|
|
|
private:
|
|
uint32_t count_ = 0;
|
|
};
|
|
|
|
class TablePropertiesCollectorFactoryTest
|
|
: public TablePropertiesCollectorFactory {
|
|
public:
|
|
TablePropertiesCollector* CreateTablePropertiesCollector(
|
|
TablePropertiesCollectorFactory::Context /*context*/) override {
|
|
return new TablePropertiesCollectorTest();
|
|
}
|
|
|
|
const char* Name() const override {
|
|
return "TablePropertiesCollectorFactoryTest";
|
|
}
|
|
};
|
|
|
|
auto factory = new TablePropertiesCollectorFactoryTest();
|
|
remote_table_properties_collector_factories.emplace_back(factory);
|
|
|
|
const int kNumSst = 3;
|
|
const int kLevel0Trigger = 4;
|
|
Options options = CurrentOptions();
|
|
options.level0_file_num_compaction_trigger = kLevel0Trigger;
|
|
ReopenWithCompactionService(&options);
|
|
|
|
// generate a few SSTs locally which should not have user property
|
|
for (int i = 0; i < kNumSst; i++) {
|
|
for (int j = 0; j < 100; j++) {
|
|
ASSERT_OK(Put(Key(i * 10 + j), "value"));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
TablePropertiesCollection fname_to_props;
|
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
|
|
for (const auto& file_props : fname_to_props) {
|
|
auto properties = file_props.second->user_collected_properties;
|
|
auto it = properties.find(kUserPropertyName);
|
|
ASSERT_EQ(it, properties.end());
|
|
}
|
|
|
|
// trigger compaction
|
|
for (int i = kNumSst; i < kLevel0Trigger; i++) {
|
|
for (int j = 0; j < 100; j++) {
|
|
ASSERT_OK(Put(Key(i * 10 + j), "value"));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
|
|
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props));
|
|
|
|
bool has_user_property = false;
|
|
for (const auto& file_props : fname_to_props) {
|
|
auto properties = file_props.second->user_collected_properties;
|
|
auto it = properties.find(kUserPropertyName);
|
|
if (it != properties.end()) {
|
|
has_user_property = true;
|
|
ASSERT_GT(std::stoi(it->second), 0);
|
|
}
|
|
}
|
|
ASSERT_TRUE(has_user_property);
|
|
}
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
RegisterCustomObjects(argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|