mirror of https://github.com/facebook/rocksdb.git
528 lines
17 KiB
C++
528 lines
17 KiB
C++
// Copyright (c) 2024-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#include "db/db_test_util.h"
|
|
#include "port/stack_trace.h"
|
|
#include "test_util/sync_point.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
#ifdef OS_LINUX
|
|
|
|
class DBFollowerTest : public DBTestBase {
|
|
public:
|
|
// Create directories for leader and follower
|
|
// Create the leader DB object
|
|
DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) {
|
|
follower_name_ = dbname_ + "/follower";
|
|
db_parent_ = dbname_;
|
|
Close();
|
|
Destroy(CurrentOptions());
|
|
EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK());
|
|
dbname_ = dbname_ + "/leader";
|
|
Reopen(CurrentOptions());
|
|
}
|
|
|
|
~DBFollowerTest() {
|
|
follower_.reset();
|
|
EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK());
|
|
Destroy(CurrentOptions());
|
|
dbname_ = db_parent_;
|
|
}
|
|
|
|
protected:
|
|
class DBFollowerTestFS : public FileSystemWrapper {
|
|
public:
|
|
explicit DBFollowerTestFS(const std::shared_ptr<FileSystem>& target)
|
|
: FileSystemWrapper(target),
|
|
cv_(&mutex_),
|
|
barrier_(false),
|
|
count_(0),
|
|
reinit_count_(0) {}
|
|
|
|
const char* Name() const override { return "DBFollowerTestFS"; }
|
|
|
|
IOStatus NewSequentialFile(const std::string& fname,
|
|
const FileOptions& file_opts,
|
|
std::unique_ptr<FSSequentialFile>* result,
|
|
IODebugContext* dbg = nullptr) override {
|
|
class DBFollowerTestSeqFile : public FSSequentialFileWrapper {
|
|
public:
|
|
DBFollowerTestSeqFile(DBFollowerTestFS* fs,
|
|
std::unique_ptr<FSSequentialFile>&& file,
|
|
uint64_t /*size*/)
|
|
: FSSequentialFileWrapper(file.get()),
|
|
fs_(fs),
|
|
file_(std::move(file)) {}
|
|
|
|
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
|
|
char* scratch, IODebugContext* dbg) override {
|
|
fs_->BarrierWait();
|
|
return target()->Read(n, options, result, scratch, dbg);
|
|
}
|
|
|
|
private:
|
|
DBFollowerTestFS* fs_;
|
|
std::unique_ptr<FSSequentialFile> file_;
|
|
};
|
|
|
|
std::unique_ptr<FSSequentialFile> file;
|
|
IOStatus s = target()->NewSequentialFile(fname, file_opts, &file, dbg);
|
|
|
|
if (s.ok() && test::GetFileType(fname) == kDescriptorFile) {
|
|
uint64_t size = 0;
|
|
EXPECT_EQ(target()->GetFileSize(fname, IOOptions(), &size, nullptr),
|
|
IOStatus::OK());
|
|
result->reset(new DBFollowerTestSeqFile(this, std::move(file), size));
|
|
} else {
|
|
*result = std::move(file);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
void BarrierInit(int count) {
|
|
MutexLock l(&mutex_);
|
|
barrier_ = true;
|
|
count_ = count;
|
|
}
|
|
|
|
void BarrierWait() {
|
|
MutexLock l(&mutex_);
|
|
if (!barrier_) {
|
|
return;
|
|
}
|
|
if (--count_ == 0) {
|
|
if (reinit_count_ > 0) {
|
|
count_ = reinit_count_;
|
|
reinit_count_ = 0;
|
|
} else {
|
|
barrier_ = false;
|
|
}
|
|
cv_.SignalAll();
|
|
} else {
|
|
cv_.Wait();
|
|
}
|
|
}
|
|
|
|
void BarrierWaitAndReinit(int count) {
|
|
MutexLock l(&mutex_);
|
|
if (!barrier_) {
|
|
return;
|
|
}
|
|
reinit_count_ = count;
|
|
if (--count_ == 0) {
|
|
if (reinit_count_ > 0) {
|
|
count_ = reinit_count_;
|
|
reinit_count_ = 0;
|
|
} else {
|
|
barrier_ = false;
|
|
}
|
|
cv_.SignalAll();
|
|
} else {
|
|
cv_.Wait();
|
|
}
|
|
}
|
|
|
|
private:
|
|
port::Mutex mutex_;
|
|
port::CondVar cv_;
|
|
bool barrier_;
|
|
int count_;
|
|
int reinit_count_;
|
|
};
|
|
|
|
class DBFollowerTestSstPartitioner : public SstPartitioner {
|
|
public:
|
|
explicit DBFollowerTestSstPartitioner(uint64_t max_keys)
|
|
: max_keys_(max_keys), num_keys_(0) {}
|
|
|
|
const char* Name() const override { return "DBFollowerTestSstPartitioner"; }
|
|
|
|
PartitionerResult ShouldPartition(
|
|
const PartitionerRequest& /*request*/) override {
|
|
if (++num_keys_ > max_keys_) {
|
|
num_keys_ = 0;
|
|
return PartitionerResult::kRequired;
|
|
} else {
|
|
return PartitionerResult::kNotRequired;
|
|
}
|
|
}
|
|
|
|
bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
|
|
const Slice& /*largest_user_key*/) override {
|
|
return true;
|
|
}
|
|
|
|
private:
|
|
uint64_t max_keys_;
|
|
uint64_t num_keys_;
|
|
};
|
|
|
|
class DBFollowerTestSstPartitionerFactory : public SstPartitionerFactory {
|
|
public:
|
|
explicit DBFollowerTestSstPartitionerFactory(uint64_t max_keys)
|
|
: max_keys_(max_keys) {}
|
|
|
|
std::unique_ptr<SstPartitioner> CreatePartitioner(
|
|
const SstPartitioner::Context& /*context*/) const override {
|
|
std::unique_ptr<SstPartitioner> partitioner;
|
|
partitioner.reset(new DBFollowerTestSstPartitioner(max_keys_));
|
|
return partitioner;
|
|
}
|
|
|
|
const char* Name() const override {
|
|
return "DBFollowerTestSstPartitionerFactory";
|
|
}
|
|
|
|
private:
|
|
uint64_t max_keys_;
|
|
};
|
|
|
|
Status OpenAsFollower() {
|
|
Options opts = CurrentOptions();
|
|
if (!follower_env_) {
|
|
follower_env_ = NewCompositeEnv(
|
|
std::make_shared<DBFollowerTestFS>(env_->GetFileSystem()));
|
|
}
|
|
opts.env = follower_env_.get();
|
|
opts.follower_refresh_catchup_period_ms = 100;
|
|
return DB::OpenAsFollower(opts, follower_name_, dbname_, &follower_);
|
|
}
|
|
|
|
std::string FollowerGet(const std::string& k) {
|
|
ReadOptions options;
|
|
options.verify_checksums = true;
|
|
std::string result;
|
|
Status s = follower()->Get(options, k, &result);
|
|
if (s.IsNotFound()) {
|
|
result = "NOT_FOUND";
|
|
} else if (!s.ok()) {
|
|
result = s.ToString();
|
|
}
|
|
return result;
|
|
}
|
|
|
|
DB* follower() { return follower_.get(); }
|
|
DBFollowerTestFS* follower_fs() {
|
|
return static_cast<DBFollowerTestFS*>(follower_env_->GetFileSystem().get());
|
|
}
|
|
|
|
void CheckDirs() {
|
|
std::vector<std::string> db_children;
|
|
std::vector<std::string> follower_children;
|
|
EXPECT_OK(env_->GetChildren(dbname_, &db_children));
|
|
EXPECT_OK(env_->GetChildren(follower_name_, &follower_children));
|
|
|
|
std::set<uint64_t> db_filenums;
|
|
std::set<uint64_t> follower_filenums;
|
|
for (auto& name : db_children) {
|
|
if (test::GetFileType(name) != kTableFile) {
|
|
continue;
|
|
}
|
|
db_filenums.insert(test::GetFileNumber(name));
|
|
}
|
|
for (auto& name : follower_children) {
|
|
if (test::GetFileType(name) != kTableFile) {
|
|
continue;
|
|
}
|
|
follower_filenums.insert(test::GetFileNumber(name));
|
|
}
|
|
db_filenums.merge(follower_filenums);
|
|
EXPECT_EQ(follower_filenums.size(), db_filenums.size());
|
|
}
|
|
|
|
private:
|
|
std::string follower_name_;
|
|
std::string db_parent_;
|
|
std::unique_ptr<Env> follower_env_;
|
|
std::unique_ptr<DB> follower_;
|
|
};
|
|
|
|
TEST_F(DBFollowerTest, Basic) {
|
|
ASSERT_OK(Put("k1", "v1"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k2", "v2"));
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(OpenAsFollower());
|
|
std::string val;
|
|
ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val));
|
|
ASSERT_EQ(val, "v1");
|
|
CheckDirs();
|
|
}
|
|
|
|
TEST_F(DBFollowerTest, Flush) {
|
|
SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
|
|
{"Leader::Done", "DBImplFollower::TryCatchupWithLeader:Begin2"},
|
|
{"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"},
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
ASSERT_OK(OpenAsFollower());
|
|
TEST_SYNC_POINT("Leader::Start");
|
|
ASSERT_OK(Put("k1", "v1"));
|
|
ASSERT_OK(Flush());
|
|
TEST_SYNC_POINT("Leader::Done");
|
|
|
|
TEST_SYNC_POINT("Follower::WaitForCatchup");
|
|
std::string val;
|
|
ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val));
|
|
ASSERT_EQ(val, "v1");
|
|
CheckDirs();
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// This test creates 4 L0 files, immediately followed by a compaction to L1.
|
|
// The follower replays the 4 flush records from the MANIFEST unsuccessfully,
|
|
// and then successfully recovers a Version from the compaction record
|
|
TEST_F(DBFollowerTest, RetryCatchup) {
|
|
Options opts = CurrentOptions();
|
|
opts.disable_auto_compactions = true;
|
|
Reopen(opts);
|
|
|
|
ASSERT_OK(OpenAsFollower());
|
|
SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
|
|
{"DBImpl::BackgroundCompaction:Start",
|
|
"DBImplFollower::TryCatchupWithLeader:Begin2"},
|
|
{"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1",
|
|
"DBImpl::BackgroundCompaction:BeforeCompaction"},
|
|
{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
|
|
"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"},
|
|
{"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"},
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
TEST_SYNC_POINT("Leader::Start");
|
|
ASSERT_OK(Put("k1", "v1"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k1", "v2"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k1", "v3"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k1", "v4"));
|
|
ASSERT_OK(Flush());
|
|
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
|
|
|
|
TEST_SYNC_POINT("Follower::WaitForCatchup");
|
|
ASSERT_EQ(FollowerGet("k1"), "v4");
|
|
CheckDirs();
|
|
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// This test validates the same as the previous test, except there is a
|
|
// MANIFEST rollover between the flushes and compaction. The follower
|
|
// does not switch to a new MANIFEST in ReadAndApply. So it would require
|
|
// another round of refresh before catching up.
|
|
TEST_F(DBFollowerTest, RetryCatchupManifestRollover) {
|
|
Options opts = CurrentOptions();
|
|
opts.disable_auto_compactions = true;
|
|
Reopen(opts);
|
|
|
|
ASSERT_OK(Put("k1", "v1"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k1", "v2"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k1", "v3"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(OpenAsFollower());
|
|
SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
|
|
{"Leader::Flushed", "DBImplFollower::TryCatchupWithLeader:Begin2"},
|
|
{"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1",
|
|
"Leader::Done"},
|
|
{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
|
|
"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"},
|
|
{"DBImplFollower::TryCatchupWithLeader:End",
|
|
"Follower::WaitForCatchup:1"},
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
TEST_SYNC_POINT("Leader::Start");
|
|
ASSERT_OK(Put("k1", "v4"));
|
|
ASSERT_OK(Flush());
|
|
|
|
TEST_SYNC_POINT("Leader::Flushed");
|
|
TEST_SYNC_POINT("Leader::Done");
|
|
Reopen(opts);
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
|
|
|
|
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
|
|
SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBImplFollower::TryCatchupWithLeader:End",
|
|
"Follower::WaitForCatchup:2"},
|
|
});
|
|
TEST_SYNC_POINT("Follower::WaitForCatchup:2");
|
|
ASSERT_EQ(FollowerGet("k1"), "v4");
|
|
|
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// This test creates 4 L0 files and compacts them. The follower, during catchup,
|
|
// successfully instantiates 4 Versions corresponding to the 4 files (but
|
|
// donesn't install them yet), followed by deleting those 4 and adding a new
|
|
// file from compaction. The test verifies that the 4 L0 files are deleted
|
|
// correctly by the follower.
|
|
// We use teh Barrier* functions to ensure that the follower first sees the 4
|
|
// L0 files and is able to link them, and then sees the compaction that
|
|
// obsoletes those L0 files (so those L0 files are intermediates that it has
|
|
// to explicitly delete). Suppose we don't have any barriers, its possible
|
|
// the follower reads the L0 records and compaction records from the MANIFEST
|
|
// in one read, which means those L0 files would have already been deleted
|
|
// by the leader and the follower cannot link to them.
|
|
TEST_F(DBFollowerTest, IntermediateObsoleteFiles) {
|
|
Options opts = CurrentOptions();
|
|
opts.disable_auto_compactions = true;
|
|
Reopen(opts);
|
|
ASSERT_OK(OpenAsFollower());
|
|
|
|
follower_fs()->BarrierInit(2);
|
|
ASSERT_OK(Put("k1", "v1"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k1", "v2"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k1", "v3"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k1", "v4"));
|
|
ASSERT_OK(Flush());
|
|
follower_fs()->BarrierWaitAndReinit(2);
|
|
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
|
|
follower_fs()->BarrierWait();
|
|
|
|
SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBImplFollower::TryCatchupWithLeader:End",
|
|
"Follower::WaitForCatchup:1"},
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
|
|
CheckDirs();
|
|
ASSERT_EQ(FollowerGet("k1"), "v4");
|
|
}
|
|
|
|
// This test verifies a scenario where the follower can recover a Version
|
|
// partially (i.e some of the additions cannot be found), and the files
|
|
// that are found are obsoleted by a subsequent VersionEdit.
|
|
TEST_F(DBFollowerTest, PartialVersionRecovery) {
|
|
Options opts = CurrentOptions();
|
|
opts.disable_auto_compactions = true;
|
|
opts.sst_partitioner_factory =
|
|
std::make_shared<DBFollowerTestSstPartitionerFactory>(1);
|
|
Reopen(opts);
|
|
|
|
ASSERT_OK(Put("k1", "v1"));
|
|
ASSERT_OK(Put("k2", "v1"));
|
|
ASSERT_OK(Put("k3", "v1"));
|
|
ASSERT_OK(Flush());
|
|
MoveFilesToLevel(2);
|
|
|
|
ASSERT_OK(Put("k1", "v2"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k3", "v2"));
|
|
ASSERT_OK(Flush());
|
|
MoveFilesToLevel(1);
|
|
|
|
ASSERT_OK(OpenAsFollower());
|
|
ASSERT_OK(dbfull()->SetOptions(dbfull()->DefaultColumnFamily(),
|
|
{{"max_compaction_bytes", "1"}}));
|
|
|
|
follower_fs()->BarrierInit(2);
|
|
Slice key("k1");
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
|
|
|
|
follower_fs()->BarrierWaitAndReinit(2);
|
|
|
|
// The second compaction input overlaps the previous compaction outputs
|
|
// by one file. This file is never added to VersionStorageInfo since it
|
|
// was added and deleted before the catch up completes. We later verify that
|
|
// the follower correctly deleted this file.
|
|
key = Slice("k3");
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
|
|
follower_fs()->BarrierWait();
|
|
|
|
SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBImplFollower::TryCatchupWithLeader:End",
|
|
"Follower::WaitForCatchup:1"},
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
|
|
CheckDirs();
|
|
ASSERT_EQ(FollowerGet("k1"), "v2");
|
|
ASSERT_EQ(FollowerGet("k2"), "v1");
|
|
ASSERT_EQ(FollowerGet("k3"), "v2");
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// This test verifies a scenario similar to the PartialVersionRecovery, except
|
|
// with a MANIFEST rollover in between. When there is a rollover, the
|
|
// follower's attempt ends without installing a new Version. The next catch up
|
|
// attempt will recover a full Version.
|
|
TEST_F(DBFollowerTest, PartialVersionRecoveryWithRollover) {
|
|
Options opts = CurrentOptions();
|
|
opts.disable_auto_compactions = true;
|
|
opts.sst_partitioner_factory =
|
|
std::make_shared<DBFollowerTestSstPartitionerFactory>(1);
|
|
Reopen(opts);
|
|
|
|
ASSERT_OK(Put("k1", "v1"));
|
|
ASSERT_OK(Put("k2", "v1"));
|
|
ASSERT_OK(Put("k3", "v1"));
|
|
ASSERT_OK(Flush());
|
|
MoveFilesToLevel(2);
|
|
|
|
ASSERT_OK(Put("k1", "v2"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("k3", "v2"));
|
|
ASSERT_OK(Flush());
|
|
MoveFilesToLevel(1);
|
|
|
|
opts.max_compaction_bytes = 1;
|
|
Reopen(opts);
|
|
|
|
ASSERT_OK(OpenAsFollower());
|
|
|
|
follower_fs()->BarrierInit(2);
|
|
Slice key("k1");
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
|
|
|
|
follower_fs()->BarrierWaitAndReinit(2);
|
|
Reopen(opts);
|
|
key = Slice("k3");
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
|
|
follower_fs()->BarrierWait();
|
|
|
|
SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBImplFollower::TryCatchupWithLeader:Begin1",
|
|
"Follower::WaitForCatchup:1"},
|
|
{"Follower::WaitForCatchup:2",
|
|
"DBImplFollower::TryCatchupWithLeader:Begin2"},
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
|
|
TEST_SYNC_POINT("Follower::WaitForCatchup:2");
|
|
SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBImplFollower::TryCatchupWithLeader:End",
|
|
"Follower::WaitForCatchup:3"},
|
|
});
|
|
TEST_SYNC_POINT("Follower::WaitForCatchup:3");
|
|
CheckDirs();
|
|
ASSERT_EQ(FollowerGet("k1"), "v2");
|
|
ASSERT_EQ(FollowerGet("k2"), "v1");
|
|
ASSERT_EQ(FollowerGet("k3"), "v2");
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
#endif
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
int main(int argc, char** argv) {
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|