Implement obsolete file deletion (GC) in follower (#12657)

Summary:
This PR implements deletion of obsolete files in a follower RocksDB instance. The follower tails the leader's MANIFEST and creates links to newly added SST files. These links need to be deleted once those files become obsolete in order to reclaim space. There are three cases to be considered -
1. New files added and links created, but the Version could not be installed due to some missing files. Those links need to be preserved so a subsequent catch up attempt can succeed. We insert the next file number in the `VersionSet` to `pending_outputs_` to prevent their deletion.
2. Files deleted from the previous successfully installed `Version`. These are deleted as usual in `PurgeObsoleteFiles`.
3. New files added by a `VersionEdit` and deleted by a subsequent `VersionEdit`, both processed in the same catchup attempt. Links will be created for the new files when verifying a candidate `Version`. Those need to be deleted explicitly as they're never added to `VersionStorageInfo`, and thus not deleted by `PurgeObsoleteFiles`.

Test plan -
New unit tests in `db_follower_test`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/12657

Reviewed By: jowlyzhang

Differential Revision: D57462697

Pulled By: anand1976

fbshipit-source-id: 898f15570638dd4930f839ffd31c560f9cb73916
This commit is contained in:
anand76 2024-05-17 19:13:33 -07:00 committed by Facebook GitHub Bot
parent ffd7930312
commit 0ed93552f4
12 changed files with 614 additions and 28 deletions

View File

@ -17,6 +17,7 @@ class DBFollowerTest : public DBTestBase {
// Create the leader DB object
DBFollowerTest() : DBTestBase("/db_follower_test", /*env_do_fsync*/ false) {
follower_name_ = dbname_ + "/follower";
db_parent_ = dbname_;
Close();
Destroy(CurrentOptions());
EXPECT_EQ(env_->CreateDirIfMissing(dbname_), Status::OK());
@ -27,17 +28,215 @@ class DBFollowerTest : public DBTestBase {
~DBFollowerTest() {
follower_.reset();
EXPECT_EQ(DestroyDB(follower_name_, CurrentOptions()), Status::OK());
Destroy(CurrentOptions());
dbname_ = db_parent_;
}
protected:
Status OpenAsFollower() {
return DB::OpenAsFollower(CurrentOptions(), follower_name_, dbname_,
&follower_);
class DBFollowerTestFS : public FileSystemWrapper {
public:
explicit DBFollowerTestFS(const std::shared_ptr<FileSystem>& target)
: FileSystemWrapper(target),
cv_(&mutex_),
barrier_(false),
count_(0),
reinit_count_(0) {}
const char* Name() const override { return "DBFollowerTestFS"; }
IOStatus NewSequentialFile(const std::string& fname,
const FileOptions& file_opts,
std::unique_ptr<FSSequentialFile>* result,
IODebugContext* dbg = nullptr) override {
class DBFollowerTestSeqFile : public FSSequentialFileWrapper {
public:
DBFollowerTestSeqFile(DBFollowerTestFS* fs,
std::unique_ptr<FSSequentialFile>&& file,
uint64_t /*size*/)
: FSSequentialFileWrapper(file.get()),
fs_(fs),
file_(std::move(file)) {}
IOStatus Read(size_t n, const IOOptions& options, Slice* result,
char* scratch, IODebugContext* dbg) override {
fs_->BarrierWait();
return target()->Read(n, options, result, scratch, dbg);
}
private:
DBFollowerTestFS* fs_;
std::unique_ptr<FSSequentialFile> file_;
};
std::unique_ptr<FSSequentialFile> file;
IOStatus s = target()->NewSequentialFile(fname, file_opts, &file, dbg);
if (s.ok() && test::GetFileType(fname) == kDescriptorFile) {
uint64_t size = 0;
EXPECT_EQ(target()->GetFileSize(fname, IOOptions(), &size, nullptr),
IOStatus::OK());
result->reset(new DBFollowerTestSeqFile(this, std::move(file), size));
} else {
*result = std::move(file);
}
return s;
}
void BarrierInit(int count) {
MutexLock l(&mutex_);
barrier_ = true;
count_ = count;
}
void BarrierWait() {
MutexLock l(&mutex_);
if (!barrier_) {
return;
}
if (--count_ == 0) {
if (reinit_count_ > 0) {
count_ = reinit_count_;
reinit_count_ = 0;
} else {
barrier_ = false;
}
cv_.SignalAll();
} else {
cv_.Wait();
}
}
void BarrierWaitAndReinit(int count) {
MutexLock l(&mutex_);
if (!barrier_) {
return;
}
reinit_count_ = count;
if (--count_ == 0) {
if (reinit_count_ > 0) {
count_ = reinit_count_;
reinit_count_ = 0;
} else {
barrier_ = false;
}
cv_.SignalAll();
} else {
cv_.Wait();
}
}
private:
port::Mutex mutex_;
port::CondVar cv_;
bool barrier_;
int count_;
int reinit_count_;
};
class DBFollowerTestSstPartitioner : public SstPartitioner {
public:
explicit DBFollowerTestSstPartitioner(uint64_t max_keys)
: max_keys_(max_keys), num_keys_(0) {}
const char* Name() const override { return "DBFollowerTestSstPartitioner"; }
PartitionerResult ShouldPartition(
const PartitionerRequest& /*request*/) override {
if (++num_keys_ > max_keys_) {
num_keys_ = 0;
return PartitionerResult::kRequired;
} else {
return PartitionerResult::kNotRequired;
}
}
bool CanDoTrivialMove(const Slice& /*smallest_user_key*/,
const Slice& /*largest_user_key*/) override {
return true;
}
private:
uint64_t max_keys_;
uint64_t num_keys_;
};
class DBFollowerTestSstPartitionerFactory : public SstPartitionerFactory {
public:
explicit DBFollowerTestSstPartitionerFactory(uint64_t max_keys)
: max_keys_(max_keys) {}
std::unique_ptr<SstPartitioner> CreatePartitioner(
const SstPartitioner::Context& /*context*/) const override {
std::unique_ptr<SstPartitioner> partitioner;
partitioner.reset(new DBFollowerTestSstPartitioner(max_keys_));
return partitioner;
}
const char* Name() const override {
return "DBFollowerTestSstPartitionerFactory";
}
private:
uint64_t max_keys_;
};
Status OpenAsFollower() {
Options opts = CurrentOptions();
if (!follower_env_) {
follower_env_ = NewCompositeEnv(
std::make_shared<DBFollowerTestFS>(env_->GetFileSystem()));
}
opts.env = follower_env_.get();
opts.follower_refresh_catchup_period_ms = 100;
return DB::OpenAsFollower(opts, follower_name_, dbname_, &follower_);
}
std::string FollowerGet(const std::string& k) {
ReadOptions options;
options.verify_checksums = true;
std::string result;
Status s = follower()->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
DB* follower() { return follower_.get(); }
DBFollowerTestFS* follower_fs() {
return static_cast<DBFollowerTestFS*>(follower_env_->GetFileSystem().get());
}
void CheckDirs() {
std::vector<std::string> db_children;
std::vector<std::string> follower_children;
EXPECT_OK(env_->GetChildren(dbname_, &db_children));
EXPECT_OK(env_->GetChildren(follower_name_, &follower_children));
std::set<uint64_t> db_filenums;
std::set<uint64_t> follower_filenums;
for (auto& name : db_children) {
if (test::GetFileType(name) != kTableFile) {
continue;
}
db_filenums.insert(test::GetFileNumber(name));
}
for (auto& name : follower_children) {
if (test::GetFileType(name) != kTableFile) {
continue;
}
follower_filenums.insert(test::GetFileNumber(name));
}
db_filenums.merge(follower_filenums);
EXPECT_EQ(follower_filenums.size(), db_filenums.size());
}
private:
std::string follower_name_;
std::string db_parent_;
std::unique_ptr<Env> follower_env_;
std::unique_ptr<DB> follower_;
};
@ -51,8 +250,273 @@ TEST_F(DBFollowerTest, Basic) {
std::string val;
ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val));
ASSERT_EQ(val, "v1");
CheckDirs();
}
TEST_F(DBFollowerTest, Flush) {
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
{"Leader::Done", "DBImplFollower::TryCatchupWithLeader:Begin2"},
{"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"},
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(OpenAsFollower());
TEST_SYNC_POINT("Leader::Start");
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
TEST_SYNC_POINT("Leader::Done");
TEST_SYNC_POINT("Follower::WaitForCatchup");
std::string val;
ASSERT_OK(follower()->Get(ReadOptions(), "k1", &val));
ASSERT_EQ(val, "v1");
CheckDirs();
SyncPoint::GetInstance()->DisableProcessing();
}
// This test creates 4 L0 files, immediately followed by a compaction to L1.
// The follower replays the 4 flush records from the MANIFEST unsuccessfully,
// and then successfully recovers a Version from the compaction record
TEST_F(DBFollowerTest, RetryCatchup) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
Reopen(opts);
ASSERT_OK(OpenAsFollower());
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
{"DBImpl::BackgroundCompaction:Start",
"DBImplFollower::TryCatchupWithLeader:Begin2"},
{"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1",
"DBImpl::BackgroundCompaction:BeforeCompaction"},
{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"},
{"DBImplFollower::TryCatchupWithLeader:End", "Follower::WaitForCatchup"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Leader::Start");
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v4"));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
TEST_SYNC_POINT("Follower::WaitForCatchup");
ASSERT_EQ(FollowerGet("k1"), "v4");
CheckDirs();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
// This test validates the same as the previous test, except there is a
// MANIFEST rollover between the flushes and compaction. The follower
// does not switch to a new MANIFEST in ReadAndApply. So it would require
// another round of refresh before catching up.
TEST_F(DBFollowerTest, RetryCatchupManifestRollover) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
Reopen(opts);
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v3"));
ASSERT_OK(Flush());
ASSERT_OK(OpenAsFollower());
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:Begin1", "Leader::Start"},
{"Leader::Flushed", "DBImplFollower::TryCatchupWithLeader:Begin2"},
{"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1",
"Leader::Done"},
{"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles",
"VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2"},
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Leader::Start");
ASSERT_OK(Put("k1", "v4"));
ASSERT_OK(Flush());
TEST_SYNC_POINT("Leader::Flushed");
TEST_SYNC_POINT("Leader::Done");
Reopen(opts);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:2"},
});
TEST_SYNC_POINT("Follower::WaitForCatchup:2");
ASSERT_EQ(FollowerGet("k1"), "v4");
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
}
// This test creates 4 L0 files and compacts them. The follower, during catchup,
// successfully instantiates 4 Versions corresponding to the 4 files (but
// donesn't install them yet), followed by deleting those 4 and adding a new
// file from compaction. The test verifies that the 4 L0 files are deleted
// correctly by the follower.
// We use teh Barrier* functions to ensure that the follower first sees the 4
// L0 files and is able to link them, and then sees the compaction that
// obsoletes those L0 files (so those L0 files are intermediates that it has
// to explicitly delete). Suppose we don't have any barriers, its possible
// the follower reads the L0 records and compaction records from the MANIFEST
// in one read, which means those L0 files would have already been deleted
// by the leader and the follower cannot link to them.
TEST_F(DBFollowerTest, IntermediateObsoleteFiles) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
Reopen(opts);
ASSERT_OK(OpenAsFollower());
follower_fs()->BarrierInit(2);
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v3"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k1", "v4"));
ASSERT_OK(Flush());
follower_fs()->BarrierWaitAndReinit(2);
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
follower_fs()->BarrierWait();
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
CheckDirs();
ASSERT_EQ(FollowerGet("k1"), "v4");
}
// This test verifies a scenario where the follower can recover a Version
// partially (i.e some of the additions cannot be found), and the files
// that are found are obsoleted by a subsequent VersionEdit.
TEST_F(DBFollowerTest, PartialVersionRecovery) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
opts.sst_partitioner_factory =
std::make_shared<DBFollowerTestSstPartitionerFactory>(1);
Reopen(opts);
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Put("k2", "v1"));
ASSERT_OK(Put("k3", "v1"));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k3", "v2"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
ASSERT_OK(OpenAsFollower());
ASSERT_OK(dbfull()->SetOptions(dbfull()->DefaultColumnFamily(),
{{"max_compaction_bytes", "1"}}));
follower_fs()->BarrierInit(2);
Slice key("k1");
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
follower_fs()->BarrierWaitAndReinit(2);
// The second compaction input overlaps the previous compaction outputs
// by one file. This file is never added to VersionStorageInfo since it
// was added and deleted before the catch up completes. We later verify that
// the follower correctly deleted this file.
key = Slice("k3");
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
follower_fs()->BarrierWait();
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:1"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
CheckDirs();
ASSERT_EQ(FollowerGet("k1"), "v2");
ASSERT_EQ(FollowerGet("k2"), "v1");
ASSERT_EQ(FollowerGet("k3"), "v2");
SyncPoint::GetInstance()->DisableProcessing();
}
// This test verifies a scenario similar to the PartialVersionRecovery, except
// with a MANIFEST rollover in between. When there is a rollover, the
// follower's attempt ends without installing a new Version. The next catch up
// attempt will recover a full Version.
TEST_F(DBFollowerTest, PartialVersionRecoveryWithRollover) {
Options opts = CurrentOptions();
opts.disable_auto_compactions = true;
opts.sst_partitioner_factory =
std::make_shared<DBFollowerTestSstPartitionerFactory>(1);
Reopen(opts);
ASSERT_OK(Put("k1", "v1"));
ASSERT_OK(Put("k2", "v1"));
ASSERT_OK(Put("k3", "v1"));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
ASSERT_OK(Put("k1", "v2"));
ASSERT_OK(Flush());
ASSERT_OK(Put("k3", "v2"));
ASSERT_OK(Flush());
MoveFilesToLevel(1);
opts.max_compaction_bytes = 1;
Reopen(opts);
ASSERT_OK(OpenAsFollower());
follower_fs()->BarrierInit(2);
Slice key("k1");
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
follower_fs()->BarrierWaitAndReinit(2);
Reopen(opts);
key = Slice("k3");
ASSERT_OK(dbfull()->TEST_CompactRange(1, &key, &key, nullptr, true));
follower_fs()->BarrierWait();
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:Begin1",
"Follower::WaitForCatchup:1"},
{"Follower::WaitForCatchup:2",
"DBImplFollower::TryCatchupWithLeader:Begin2"},
});
SyncPoint::GetInstance()->EnableProcessing();
TEST_SYNC_POINT("Follower::WaitForCatchup:1");
TEST_SYNC_POINT("Follower::WaitForCatchup:2");
SyncPoint::GetInstance()->LoadDependency({
{"DBImplFollower::TryCatchupWithLeader:End",
"Follower::WaitForCatchup:3"},
});
TEST_SYNC_POINT("Follower::WaitForCatchup:3");
CheckDirs();
ASSERT_EQ(FollowerGet("k1"), "v2");
ASSERT_EQ(FollowerGet("k2"), "v1");
ASSERT_EQ(FollowerGet("k3"), "v2");
SyncPoint::GetInstance()->DisableProcessing();
}
#endif
} // namespace ROCKSDB_NAMESPACE

View File

@ -1648,6 +1648,7 @@ class DBImpl : public DB {
friend class ForwardIterator;
friend struct SuperVersion;
friend class CompactedDBImpl;
friend class DBImplFollower;
#ifndef NDEBUG
friend class DBTest_ConcurrentFlushWAL_Test;
friend class DBTest_MixedSlowdownOptionsStop_Test;

View File

@ -95,17 +95,28 @@ Status DBImplFollower::TryCatchUpWithLeader() {
assert(versions_.get() != nullptr);
assert(manifest_reader_.get() != nullptr);
Status s;
TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:Begin1");
TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:Begin2");
// read the manifest and apply new changes to the follower instance
std::unordered_set<ColumnFamilyData*> cfds_changed;
JobContext job_context(0, true /*create_superversion*/);
{
InstrumentedMutexLock lock_guard(&mutex_);
std::vector<std::string> files_to_delete;
s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_,
manifest_reader_status_.get(), &cfds_changed);
manifest_reader_status_.get(), &cfds_changed,
&files_to_delete);
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem_);
pending_outputs_inserted_elem_.reset(new std::list<uint64_t>::iterator(
CaptureCurrentFileNumberInPendingOutputs()));
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));
ROCKS_LOG_INFO(
immutable_db_options_.info_log, "Next file number is %" PRIu64,
static_cast<uint64_t>(versions_->current_next_file_number()));
for (ColumnFamilyData* cfd : cfds_changed) {
if (cfd->IsDropped()) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n",
@ -147,9 +158,33 @@ Status DBImplFollower::TryCatchUpWithLeader() {
sv_context.NewSuperVersion();
}
}
for (auto& file : files_to_delete) {
IOStatus io_s = fs_->DeleteFile(file, IOOptions(), nullptr);
if (!io_s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Cannot delete file %s: %s", file.c_str(),
io_s.ToString().c_str());
}
}
}
job_context.Clean();
// Cleanup unused, obsolete files.
JobContext purge_files_job_context(0);
{
InstrumentedMutexLock lock_guard(&mutex_);
// Currently, follower instance does not create any database files, thus
// is unnecessary for the follower to force full scan.
FindObsoleteFiles(&purge_files_job_context, /*force=*/false);
}
if (purge_files_job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(purge_files_job_context);
}
purge_files_job_context.Clean();
TEST_SYNC_POINT("DBImplFollower::TryCatchupWithLeader:End");
return s;
}
@ -199,6 +234,8 @@ Status DBImplFollower::Close() {
catch_up_thread_.reset();
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem_);
return DBImpl::Close();
}

View File

@ -27,7 +27,7 @@ class DBImplFollower : public DBImplSecondary {
bool OwnTablesAndLogs() const override {
// TODO: Change this to true once we've properly implemented file
// deletion for the read scaling case
return false;
return true;
}
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
@ -49,5 +49,6 @@ class DBImplFollower : public DBImplSecondary {
std::string src_path_;
port::Mutex mu_;
port::CondVar cv_;
std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem_;
};
} // namespace ROCKSDB_NAMESPACE

View File

@ -680,7 +680,8 @@ Status DBImplSecondary::TryCatchUpWithPrimary() {
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast_with_check<ReactiveVersionSet>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_,
manifest_reader_status_.get(), &cfds_changed);
manifest_reader_status_.get(), &cfds_changed,
/*files_to_delete=*/nullptr);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64,
static_cast<uint64_t>(versions_->LastSequence()));

View File

@ -152,7 +152,7 @@ Status FileChecksumRetriever::ApplyVersionEdit(VersionEdit& edit,
VersionEditHandler::VersionEditHandler(
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set, bool track_missing_files,
VersionSet* version_set, bool track_found_and_missing_files,
bool no_error_if_files_missing, const std::shared_ptr<IOTracer>& io_tracer,
const ReadOptions& read_options, bool skip_load_table_files,
EpochNumberRequirement epoch_number_requirement)
@ -160,7 +160,7 @@ VersionEditHandler::VersionEditHandler(
read_only_(read_only),
column_families_(std::move(column_families)),
version_set_(version_set),
track_missing_files_(track_missing_files),
track_found_and_missing_files_(track_found_and_missing_files),
no_error_if_files_missing_(no_error_if_files_missing),
io_tracer_(io_tracer),
skip_load_table_files_(skip_load_table_files),
@ -500,7 +500,8 @@ ColumnFamilyData* VersionEditHandler::CreateCfAndInit(
assert(builders_.find(cf_id) == builders_.end());
builders_.emplace(cf_id,
VersionBuilderUPtr(new BaseReferencedVersionBuilder(cfd)));
if (track_missing_files_) {
if (track_found_and_missing_files_) {
cf_to_found_files_.emplace(cf_id, std::unordered_set<uint64_t>());
cf_to_missing_files_.emplace(cf_id, std::unordered_set<uint64_t>());
cf_to_missing_blob_files_high_.emplace(cf_id, kInvalidBlobFileNumber);
}
@ -513,7 +514,11 @@ ColumnFamilyData* VersionEditHandler::DestroyCfAndCleanup(
auto builder_iter = builders_.find(cf_id);
assert(builder_iter != builders_.end());
builders_.erase(builder_iter);
if (track_missing_files_) {
if (track_found_and_missing_files_) {
auto found_files_iter = cf_to_found_files_.find(cf_id);
assert(found_files_iter != cf_to_found_files_.end());
cf_to_found_files_.erase(found_files_iter);
auto missing_files_iter = cf_to_missing_files_.find(cf_id);
assert(missing_files_iter != cf_to_missing_files_.end());
cf_to_missing_files_.erase(missing_files_iter);
@ -729,7 +734,7 @@ VersionEditHandlerPointInTime::VersionEditHandlerPointInTime(
const ReadOptions& read_options,
EpochNumberRequirement epoch_number_requirement)
: VersionEditHandler(read_only, column_families, version_set,
/*track_missing_files=*/true,
/*track_found_and_missing_files=*/true,
/*no_error_if_files_missing=*/true, io_tracer,
read_options, epoch_number_requirement) {}
@ -824,6 +829,12 @@ void VersionEditHandlerPointInTime::CheckIterationResult(
version_set_->AppendVersion(cfd, v_iter->second);
versions_.erase(v_iter);
// Let's clear found_files, since any files in that are part of the
// installed Version. Any files that got obsoleted would have already
// been moved to intermediate_files_
auto found_files_iter = cf_to_found_files_.find(cfd->GetID());
assert(found_files_iter != cf_to_found_files_.end());
found_files_iter->second.clear();
}
}
} else {
@ -854,10 +865,16 @@ ColumnFamilyData* VersionEditHandlerPointInTime::DestroyCfAndCleanup(
Status VersionEditHandlerPointInTime::MaybeCreateVersion(
const VersionEdit& edit, ColumnFamilyData* cfd, bool force_create_version) {
TEST_SYNC_POINT("VersionEditHandlerPointInTime::MaybeCreateVersion:Begin1");
TEST_SYNC_POINT("VersionEditHandlerPointInTime::MaybeCreateVersion:Begin2");
assert(cfd != nullptr);
if (!force_create_version) {
assert(edit.GetColumnFamily() == cfd->GetID());
}
auto found_files_iter = cf_to_found_files_.find(cfd->GetID());
assert(found_files_iter != cf_to_found_files_.end());
std::unordered_set<uint64_t>& found_files = found_files_iter->second;
auto missing_files_iter = cf_to_missing_files_.find(cfd->GetID());
assert(missing_files_iter != cf_to_missing_files_.end());
std::unordered_set<uint64_t>& missing_files = missing_files_iter->second;
@ -889,6 +906,18 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
auto fiter = missing_files.find(file_num);
if (fiter != missing_files.end()) {
missing_files.erase(fiter);
} else {
fiter = found_files.find(file_num);
// Only mark new files added during this catchup attempt for deletion.
// These files were never installed in VersionStorageInfo.
// Already referenced files that are deleted by a VersionEdit will
// be added to the VersionStorageInfo's obsolete files when the old
// version is dereferenced.
if (fiter != found_files.end()) {
intermediate_files_.emplace_back(
MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_num));
found_files.erase(fiter);
}
}
}
@ -904,9 +933,14 @@ Status VersionEditHandlerPointInTime::MaybeCreateVersion(
s = VerifyFile(cfd, fpath, level, meta);
if (s.IsPathNotFound() || s.IsNotFound() || s.IsCorruption()) {
missing_files.insert(file_num);
if (s.IsCorruption()) {
found_files.insert(file_num);
}
s = Status::OK();
} else if (!s.ok()) {
break;
} else {
found_files.insert(file_num);
}
}

View File

@ -104,7 +104,7 @@ using VersionBuilderUPtr = std::unique_ptr<BaseReferencedVersionBuilder>;
// To use this class and its subclasses,
// 1. Create an object of VersionEditHandler or its subclasses.
// VersionEditHandler handler(read_only, column_families, version_set,
// track_missing_files,
// track_found_and_missing_files,
// no_error_if_files_missing);
// 2. Status s = handler.Iterate(reader, &db_id);
// 3. Check s and handle possible errors.
@ -116,16 +116,17 @@ class VersionEditHandler : public VersionEditHandlerBase {
explicit VersionEditHandler(
bool read_only,
const std::vector<ColumnFamilyDescriptor>& column_families,
VersionSet* version_set, bool track_missing_files,
VersionSet* version_set, bool track_found_and_missing_files,
bool no_error_if_files_missing,
const std::shared_ptr<IOTracer>& io_tracer,
const ReadOptions& read_options,
EpochNumberRequirement epoch_number_requirement =
EpochNumberRequirement::kMustPresent)
: VersionEditHandler(
read_only, column_families, version_set, track_missing_files,
: VersionEditHandler(read_only, column_families, version_set,
track_found_and_missing_files,
no_error_if_files_missing, io_tracer, read_options,
/*skip_load_table_files=*/false, epoch_number_requirement) {}
/*skip_load_table_files=*/false,
epoch_number_requirement) {}
~VersionEditHandler() override {}
@ -144,7 +145,7 @@ class VersionEditHandler : public VersionEditHandlerBase {
protected:
explicit VersionEditHandler(
bool read_only, std::vector<ColumnFamilyDescriptor> column_families,
VersionSet* version_set, bool track_missing_files,
VersionSet* version_set, bool track_found_and_missing_files,
bool no_error_if_files_missing,
const std::shared_ptr<IOTracer>& io_tracer,
const ReadOptions& read_options, bool skip_load_table_files,
@ -195,7 +196,8 @@ class VersionEditHandler : public VersionEditHandlerBase {
// by subsequent manifest records, Recover() will return failure status.
std::unordered_map<uint32_t, std::string> column_families_not_found_;
VersionEditParams version_edit_params_;
const bool track_missing_files_;
const bool track_found_and_missing_files_;
std::unordered_map<uint32_t, std::unordered_set<uint64_t>> cf_to_found_files_;
std::unordered_map<uint32_t, std::unordered_set<uint64_t>>
cf_to_missing_files_;
std::unordered_map<uint32_t, uint64_t> cf_to_missing_blob_files_high_;
@ -273,6 +275,8 @@ class VersionEditHandlerPointInTime : public VersionEditHandler {
bool in_atomic_group_ = false;
std::vector<std::string> intermediate_files_;
private:
bool AtomicUpdateVersionsCompleted();
bool AtomicUpdateVersionsContains(uint32_t cfid);
@ -310,6 +314,10 @@ class ManifestTailer : public VersionEditHandlerPointInTime {
return cfds_changed_;
}
std::vector<std::string>& GetIntermediateFiles() {
return intermediate_files_;
}
protected:
Status Initialize() override;
@ -342,7 +350,7 @@ class DumpManifestHandler : public VersionEditHandler {
bool json)
: VersionEditHandler(
/*read_only=*/true, column_families, version_set,
/*track_missing_files=*/false,
/*track_found_and_missing_files=*/false,
/*no_error_if_files_missing=*/false, io_tracer, read_options,
/*skip_load_table_files=*/true),
verbose_(verbose),

View File

@ -6063,8 +6063,8 @@ Status VersionSet::Recover(
true /* checksum */, 0 /* log_number */);
VersionEditHandler handler(
read_only, column_families, const_cast<VersionSet*>(this),
/*track_missing_files=*/false, no_error_if_files_missing, io_tracer_,
read_options, EpochNumberRequirement::kMightMissing);
/*track_found_and_missing_files=*/false, no_error_if_files_missing,
io_tracer_, read_options, EpochNumberRequirement::kMightMissing);
handler.Iterate(reader, &log_read_status);
s = handler.status();
if (s.ok()) {
@ -7439,7 +7439,8 @@ Status ReactiveVersionSet::ReadAndApply(
InstrumentedMutex* mu,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
Status* manifest_read_status,
std::unordered_set<ColumnFamilyData*>* cfds_changed) {
std::unordered_set<ColumnFamilyData*>* cfds_changed,
std::vector<std::string>* files_to_delete) {
assert(manifest_reader != nullptr);
assert(cfds_changed != nullptr);
mu->AssertHeld();
@ -7456,6 +7457,9 @@ Status ReactiveVersionSet::ReadAndApply(
if (s.ok()) {
*cfds_changed = std::move(manifest_tailer_->GetUpdatedColumnFamilies());
}
if (files_to_delete) {
*files_to_delete = std::move(manifest_tailer_->GetIntermediateFiles());
}
return s;
}

View File

@ -1741,7 +1741,8 @@ class ReactiveVersionSet : public VersionSet {
InstrumentedMutex* mu,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
Status* manifest_read_status,
std::unordered_set<ColumnFamilyData*>* cfds_changed);
std::unordered_set<ColumnFamilyData*>* cfds_changed,
std::vector<std::string>* files_to_delete);
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,

View File

@ -2742,7 +2742,8 @@ TEST_F(VersionSetAtomicGroupTest,
std::unordered_set<ColumnFamilyData*> cfds_changed;
mu.Lock();
EXPECT_OK(reactive_versions_->ReadAndApply(
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
/*files_to_delete=*/nullptr));
mu.Unlock();
EXPECT_TRUE(first_in_atomic_group_);
EXPECT_TRUE(last_in_atomic_group_);
@ -2797,7 +2798,8 @@ TEST_F(VersionSetAtomicGroupTest,
std::unordered_set<ColumnFamilyData*> cfds_changed;
mu.Lock();
EXPECT_OK(reactive_versions_->ReadAndApply(
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
/*files_to_delete=*/nullptr));
mu.Unlock();
// Reactive version set should be empty now.
EXPECT_TRUE(reactive_versions_->TEST_read_edits_in_atomic_group() == 0);
@ -2826,7 +2828,8 @@ TEST_F(VersionSetAtomicGroupTest,
std::unordered_set<ColumnFamilyData*> cfds_changed;
mu.Lock();
EXPECT_OK(reactive_versions_->ReadAndApply(
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
/*files_to_delete=*/nullptr));
mu.Unlock();
EXPECT_TRUE(first_in_atomic_group_);
EXPECT_FALSE(last_in_atomic_group_);
@ -2882,7 +2885,8 @@ TEST_F(VersionSetAtomicGroupTest,
AddNewEditsToLog(kAtomicGroupSize);
mu.Lock();
EXPECT_NOK(reactive_versions_->ReadAndApply(
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
/*files_to_delete=*/nullptr));
mu.Unlock();
EXPECT_EQ(edits_[kAtomicGroupSize / 2].DebugString(),
corrupted_edit_.DebugString());
@ -2932,7 +2936,8 @@ TEST_F(VersionSetAtomicGroupTest,
AddNewEditsToLog(kAtomicGroupSize);
mu.Lock();
EXPECT_NOK(reactive_versions_->ReadAndApply(
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed));
&mu, &manifest_reader, manifest_reader_status.get(), &cfds_changed,
/*files_to_delete=*/nullptr));
mu.Unlock();
EXPECT_EQ(edits_[1].DebugString(),
edit_with_incorrect_group_size_.DebugString());

View File

@ -565,6 +565,30 @@ void DeleteDir(Env* env, const std::string& dirname) {
TryDeleteDir(env, dirname).PermitUncheckedError();
}
FileType GetFileType(const std::string& path) {
FileType type = kTempFile;
std::size_t found = path.find_last_of('/');
if (found == std::string::npos) {
found = 0;
}
std::string file_name = path.substr(found);
uint64_t number = 0;
ParseFileName(file_name, &number, &type);
return type;
}
uint64_t GetFileNumber(const std::string& path) {
FileType type = kTempFile;
std::size_t found = path.find_last_of('/');
if (found == std::string::npos) {
found = 0;
}
std::string file_name = path.substr(found);
uint64_t number = 0;
ParseFileName(file_name, &number, &type);
return number;
}
Status CreateEnvFromSystem(const ConfigOptions& config_options, Env** result,
std::shared_ptr<Env>* guard) {
const char* env_uri = getenv("TEST_ENV_URI");

View File

@ -882,6 +882,12 @@ Status TryDeleteDir(Env* env, const std::string& dirname);
// Delete a directory if it exists
void DeleteDir(Env* env, const std::string& dirname);
// Find the FileType from the file path
FileType GetFileType(const std::string& path);
// Get the file number given the file path
uint64_t GetFileNumber(const std::string& path);
// Creates an Env from the system environment by looking at the system
// environment variables.
Status CreateEnvFromSystem(const ConfigOptions& options, Env** result,