rocksdb/db/db_follower_test.cc
anand76 0ed93552f4 Implement obsolete file deletion (GC) in follower (#12657)
Summary:
This PR implements deletion of obsolete files in a follower RocksDB instance. The follower tails the leader's MANIFEST and creates links to newly added SST files. These links need to be deleted once those files become obsolete in order to reclaim space. There are three cases to be considered -
1. New files added and links created, but the Version could not be installed due to some missing files. Those links need to be preserved so a subsequent catch up attempt can succeed. We insert the next file number in the `VersionSet` to `pending_outputs_` to prevent their deletion.
2. Files deleted from the previous successfully installed `Version`. These are deleted as usual in `PurgeObsoleteFiles`.
3. New files added by a `VersionEdit` and deleted by a subsequent `VersionEdit`, both processed in the same catchup attempt. Links will be created for the new files when verifying a candidate `Version`. Those need to be deleted explicitly as they're never added to `VersionStorageInfo`, and thus not deleted by `PurgeObsoleteFiles`.

Test plan -
New unit tests in `db_follower_test`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12657

Reviewed By: jowlyzhang

Differential Revision: D57462697

Pulled By: anand1976

fbshipit-source-id: 898f15570638dd4930f839ffd31c560f9cb73916
2024-05-17 19:13:33 -07:00

528 lines
17 KiB
C++

// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
#ifdef OS_LINUX
class DBFollowerTest : public DBTestBase {
public:
// Create directories for leader and follower
// Create the leader DB object
DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) {
follower_name_ = dbname_ + "/follower";
db_parent_ = dbname_;
Close();
Destroy(CurrentOptions());
EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK());
dbname_ = dbname_ + "/leader";
Reopen(CurrentOptions());
}
~DBFollowerTest() {
follower_.reset();
EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK());
Destroy(CurrentOptions());
dbname_ = db_parent_;
}
protected:
class DBFollowerTestFS : public FileSystemWrapper {
public:
explicit DBFollowerTestFS(const std::shared_ptr<FileSystem>& target)
: FileSystemWrapper(target),
cv_(&mutex_),
barrier_(false),
count_(0),
reinit_count_(0) {}
const char* Name() const override { return "DBFollowerTestFS"; }
IOStatus NewSequentialFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* result,
IODebugContext* dbg = nullptr) override {
class DBFollowerTestSeqFile : public FSSequentialFileWrapper {
public:
DBFollowerTestSeqFile(DBFollowerTestFS* fs,
std::unique_ptr<FSSequentialFile>&& file,
uint64_t /*size*/)
: FSSequentialFileWrapper(file.get()),
fs_(fs),
file_(std::move(file)) {}
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override {
fs_->BarrierWait();
return target()->Read(n, options, result, scratch, dbg);
}
private:
DBFollowerTestFS* fs_;
std::unique_ptr<FSSequentialFile> file_;
};
std::unique_ptr<FSSequentialFile> file;
IOStatus s = target()->NewSequentialFile(fname, file_opts, &file, dbg);
if (s.ok() && test::GetFileType(fname) == kDescriptorFile) {
uint64_t size = 0;
EXPECT_EQ(target()->GetFileSize(fname, IOOptions(), &size, nullptr),
IOStatus::OK());
result->reset(new DBFollowerTestSeqFile(this, std::move(file), size));
} else {
*result = std::move(file);
}
return s;
}
void BarrierInit(int count) {
MutexLock l(&mutex_);
barrier_ = true;
count_ = count;
}
void BarrierWait() {
MutexLock l(&mutex_);
if (!barrier_) {
return;
}
if (--count_ == 0) {
if (reinit_count_ > 0) {
count_ = reinit_count_;
reinit_count_ = 0;
} else {
barrier_ = false;
}
cv_.SignalAll();
} else {
cv_.Wait();
}
}
void BarrierWaitAndReinit(int count) {
MutexLock l(&mutex_);
if (!barrier_) {
return;
}
reinit_count_ = count;
if (--count_ == 0) {
if (reinit_count_ > 0) {
count_ = reinit_count_;
reinit_count_ = 0;
} else {
barrier_ = false;
}
cv_.SignalAll();
} else {
cv_.Wait();
}
}
private:
port::Mutex mutex_;
port::CondVar cv_;
bool barrier_;
int count_;
int reinit_count_;
};
class DBFollowerTestSstPartitioner : public SstPartitioner {
public:
explicit DBFollowerTestSstPartitioner(uint64_t max_keys)
: max_keys_(max_keys), num_keys_(0) {}
const char* Name() const override { return "DBFollowerTestSstPartitioner"; }
PartitionerResult ShouldPartition(
const PartitionerRequest& /*request*/) override {
if (++num_keys_ > max_keys_) {
num_keys_ = 0;
return PartitionerResult::kRequired;
} else {
return PartitionerResult::kNotRequired;
}
}
bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
const Slice& /*largest_user_key*/) override {
return true;
}
private:
uint64_t max_keys_;
uint64_t num_keys_;
};
class DBFollowerTestSstPartitionerFactory : public SstPartitionerFactory {
public:
explicit DBFollowerTestSstPartitionerFactory(uint64_t max_keys)
: max_keys_(max_keys) {}
std::unique_ptr<SstPartitioner> CreatePartitioner(
const SstPartitioner::Context& /*context*/) const override {
std::unique_ptr<SstPartitioner> partitioner;
partitioner.reset(new DBFollowerTestSstPartitioner(max_keys_));
return partitioner;
}
const char* Name() const override {
return "DBFollowerTestSstPartitionerFactory";
}
private:
uint64_t max_keys_;
};
Status OpenAsFollower() {
Options opts = CurrentOptions();
if (!follower_env_) {
follower_env_ = NewCompositeEnv(
std::make_shared<DBFollowerTestFS>(env_->GetFileSystem()));
}
opts.env = follower_env_.get();
opts.follower_refresh_catchup_period_ms = 100;
return DB::OpenAsFollower(opts, follower_name_, dbname_, &follower_);
}
std::string FollowerGet(const std::string& k) {
ReadOptions options;
options.verify_checksums = true;
std::string result;
Status s = follower()->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
DB* follower() { return follower_.get(); }
DBFollowerTestFS* follower_fs() {
return static_cast<DBFollowerTestFS*>(follower_env_->GetFileSystem().get());
}
void CheckDirs() {
std::vector<std::string> db_children;
std::vector<std::string> follower_children;
EXPECT_OK(env_->GetChildren(dbname_, &db_children));
EXPECT_OK(env_->GetChildren(follower_name_, &follower_children));
std::set<uint64_t> db_filenums;
std::set<uint64_t> follower_filenums;
for (auto& name : db_children) {
if (test::GetFileType(name) != kTableFile) {
continue;
}
db_filenums.insert(test::GetFileNumber(name));
}
for (auto& name : follower_children) {
if (test::GetFileType(name) != kTableFile) {
continue;
}
follower_filenums.insert(test::GetFileNumber(name));
}
db_filenums.merge(follower_filenums);
EXPECT_EQ(follower_filenums.size(), db_filenums.size());
}
private:
std::string follower_name_;
std::string db_parent_;
std::unique_ptr<Env> follower_env_;
std::unique_ptr<DB> follower_;
};
TEST_F(DBFollowerTest, Basic) {
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(OpenAsFollower());
std::string val;
ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val));
ASSERT_EQ(val, "v1");
CheckDirs();
}
TEST_F(DBFollowerTest, Flush) {
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
{"Leader::Done", "DBImplFollower::TryCatchupWithLeader:Begin2"},
{"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"},
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(OpenAsFollower());
TEST_SYNC_POINT("Leader::Start");
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
TEST_SYNC_POINT("Leader::Done");
TEST_SYNC_POINT("Follower::WaitForCatchup");
std::string val;
ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val));
ASSERT_EQ(val, "v1");
CheckDirs();
SyncPoint::GetInstance()->DisableProcessing();
}
// This test creates 4 L0 files, immediately followed by a compaction to L1.
// The follower replays the 4 flush records from the MANIFEST unsuccessfully,
// and then successfully recovers a Version from the compaction record
TEST_F(DBFollowerTest, RetryCatchup) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
Reopen(opts);
ASSERT_OK(OpenAsFollower());
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
{"DBImpl::BackgroundCompaction:Start",
"DBImplFollower::TryCatchupWithLeader:Begin2"},
{"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1",
"DBImpl::BackgroundCompaction:BeforeCompaction"},
{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"},
{"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Leader::Start");
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v4"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
TEST_SYNC_POINT("Follower::WaitForCatchup");
ASSERT_EQ(FollowerGet("k1"), "v4");
CheckDirs();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
// This test validates the same as the previous test, except there is a
// MANIFEST rollover between the flushes and compaction. The follower
// does not switch to a new MANIFEST in ReadAndApply. So it would require
// another round of refresh before catching up.
TEST_F(DBFollowerTest, RetryCatchupManifestRollover) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
Reopen(opts);
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v3"));
ASSERT_OK(Flush());
ASSERT_OK(OpenAsFollower());
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
{"Leader::Flushed", "DBImplFollower::TryCatchupWithLeader:Begin2"},
{"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1",
"Leader::Done"},
{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"},
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Leader::Start");
ASSERT_OK(Put("k1", "v4"));
ASSERT_OK(Flush());
TEST_SYNC_POINT("Leader::Flushed");
TEST_SYNC_POINT("Leader::Done");
Reopen(opts);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:2"},
});
TEST_SYNC_POINT("Follower::WaitForCatchup:2");
ASSERT_EQ(FollowerGet("k1"), "v4");
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
// This test creates 4 L0 files and compacts them. The follower, during catchup,
// successfully instantiates 4 Versions corresponding to the 4 files (but
// donesn't install them yet), followed by deleting those 4 and adding a new
// file from compaction. The test verifies that the 4 L0 files are deleted
// correctly by the follower.
// We use teh Barrier* functions to ensure that the follower first sees the 4
// L0 files and is able to link them, and then sees the compaction that
// obsoletes those L0 files (so those L0 files are intermediates that it has
// to explicitly delete). Suppose we don't have any barriers, its possible
// the follower reads the L0 records and compaction records from the MANIFEST
// in one read, which means those L0 files would have already been deleted
// by the leader and the follower cannot link to them.
TEST_F(DBFollowerTest, IntermediateObsoleteFiles) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
Reopen(opts);
ASSERT_OK(OpenAsFollower());
follower_fs()->BarrierInit(2);
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v4"));
ASSERT_OK(Flush());
follower_fs()->BarrierWaitAndReinit(2);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
follower_fs()->BarrierWait();
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
CheckDirs();
ASSERT_EQ(FollowerGet("k1"), "v4");
}
// This test verifies a scenario where the follower can recover a Version
// partially (i.e some of the additions cannot be found), and the files
// that are found are obsoleted by a subsequent VersionEdit.
TEST_F(DBFollowerTest, PartialVersionRecovery) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
opts.sst_partitioner_factory =
std::make_shared<DBFollowerTestSstPartitionerFactory>(1);
Reopen(opts);
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Put("k2", "v1"));
ASSERT_OK(Put("k3", "v1"));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k3", "v2"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_OK(OpenAsFollower());
ASSERT_OK(dbfull()->SetOptions(dbfull()->DefaultColumnFamily(),
{{"max_compaction_bytes", "1"}}));
follower_fs()->BarrierInit(2);
Slice key("k1");
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
follower_fs()->BarrierWaitAndReinit(2);
// The second compaction input overlaps the previous compaction outputs
// by one file. This file is never added to VersionStorageInfo since it
// was added and deleted before the catch up completes. We later verify that
// the follower correctly deleted this file.
key = Slice("k3");
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
follower_fs()->BarrierWait();
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
CheckDirs();
ASSERT_EQ(FollowerGet("k1"), "v2");
ASSERT_EQ(FollowerGet("k2"), "v1");
ASSERT_EQ(FollowerGet("k3"), "v2");
SyncPoint::GetInstance()->DisableProcessing();
}
// This test verifies a scenario similar to the PartialVersionRecovery, except
// with a MANIFEST rollover in between. When there is a rollover, the
// follower's attempt ends without installing a new Version. The next catch up
// attempt will recover a full Version.
TEST_F(DBFollowerTest, PartialVersionRecoveryWithRollover) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
opts.sst_partitioner_factory =
std::make_shared<DBFollowerTestSstPartitionerFactory>(1);
Reopen(opts);
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Put("k2", "v1"));
ASSERT_OK(Put("k3", "v1"));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k3", "v2"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
opts.max_compaction_bytes = 1;
Reopen(opts);
ASSERT_OK(OpenAsFollower());
follower_fs()->BarrierInit(2);
Slice key("k1");
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
follower_fs()->BarrierWaitAndReinit(2);
Reopen(opts);
key = Slice("k3");
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
follower_fs()->BarrierWait();
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:Begin1",
"Follower::WaitForCatchup:1"},
{"Follower::WaitForCatchup:2",
"DBImplFollower::TryCatchupWithLeader:Begin2"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
TEST_SYNC_POINT("Follower::WaitForCatchup:2");
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:3"},
});
TEST_SYNC_POINT("Follower::WaitForCatchup:3");
CheckDirs();
ASSERT_EQ(FollowerGet("k1"), "v2");
ASSERT_EQ(FollowerGet("k2"), "v1");
ASSERT_EQ(FollowerGet("k3"), "v2");
SyncPoint::GetInstance()->DisableProcessing();
}
#endif
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}