diff --git a/CMakeLists.txt b/CMakeLists.txt index 8c74e1db41..fe72adfa2e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -66,7 +66,7 @@ else() include_directories(${JEMALLOC_INCLUDE_DIR}) endif() endif() - + # No config file for this option(WITH_GFLAGS "build with GFlags" ON) if(WITH_GFLAGS) @@ -833,6 +833,7 @@ if(WITH_TESTS) db/db_write_test.cc db/dbformat_test.cc db/deletefile_test.cc + db/obsolete_files_test.cc db/external_sst_file_basic_test.cc db/external_sst_file_test.cc db/fault_injection_test.cc diff --git a/Makefile b/Makefile index c0c530c1f4..9a6143eb4c 100644 --- a/Makefile +++ b/Makefile @@ -474,6 +474,7 @@ TESTS = \ write_batch_with_index_test \ write_controller_test\ deletefile_test \ + obsolete_files_test \ table_test \ geodb_test \ delete_scheduler_test \ @@ -1380,6 +1381,9 @@ options_file_test: db/options_file_test.o $(LIBOBJECTS) $(TESTHARNESS) deletefile_test: db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +obsolete_files_test: db/obsolete_files_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + geodb_test: utilities/geodb/geodb_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 4123b35824..fd06856a42 100644 --- a/TARGETS +++ b/TARGETS @@ -655,6 +655,11 @@ ROCKS_TESTS = [ "utilities/document/document_db_test.cc", "serial", ], + [ + "obsolete_files_test", + "db/obsolete_files_test.cc", + "serial", + ], [ "dynamic_bloom_test", "util/dynamic_bloom_test.cc", diff --git a/db/db_impl.cc b/db/db_impl.cc index f7ba90f528..6e6408c9fa 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -300,6 +300,8 @@ Status DBImpl::CloseHelper() { TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob"); bg_cv_.Wait(); } + TEST_SYNC_POINT_CALLBACK("DBImpl::CloseHelper:PendingPurgeFinished", + &files_grabbed_for_purge_); EraseThreadStatusDbInfo(); flush_scheduler_.Clear(); diff --git a/db/db_impl.h b/db/db_impl.h index 33e44bf4d0..3bf8d50f05 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -1186,6 +1186,10 @@ class DBImpl : public DB { // A queue to store filenames of the files to be purged std::deque purge_queue_; + // A vector to store the file numbers that have been assigned to certain + // JobContext. Current implementation tracks ssts only. + std::vector files_grabbed_for_purge_; + // A queue to store log writers to close std::deque logs_to_free_queue_; int unscheduled_flushes_; @@ -1389,6 +1393,9 @@ class DBImpl : public DB { bool HaveManualCompaction(ColumnFamilyData* cfd); bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1); + bool ShouldPurge(uint64_t file_number) const; + void MarkAsGrabbedForPurge(uint64_t file_number); + size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index ca14b84432..92104476cf 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -1479,6 +1479,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, // have created (they might not be all recorded in job_context in case of a // failure). Thus, we force full scan in FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); + TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"); // delete unnecessary files if any, this is done outside the mutex if (job_context.HaveSomethingToClean() || @@ -1492,6 +1493,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, log_buffer.FlushBufferToLog(); if (job_context.HaveSomethingToDelete()) { PurgeObsoleteFiles(job_context); + TEST_SYNC_POINT("DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles"); } job_context.Clean(); mutex_.Lock(); @@ -2074,6 +2076,37 @@ void DBImpl::InstallSuperVersionAndScheduleWork( mutable_cf_options.max_write_buffer_number; } +// ShouldPurge is called by FindObsoleteFiles when doing a full scan, +// and db mutex (mutex_) should already be held. This function performs a +// linear scan of an vector (files_grabbed_for_purge_) in search of a +// certain element. We expect FindObsoleteFiles with full scan to occur once +// every 10 hours by default, and the size of the vector is small. +// Therefore, the cost is affordable even if the mutex is held. +// Actually, the current implementation of FindObsoleteFiles with +// full_scan=true can issue I/O requests to obtain list of files in +// directories, e.g. env_->getChildren while holding db mutex. +// In the future, if we want to reduce the cost of search, we may try to keep +// the vector sorted. +bool DBImpl::ShouldPurge(uint64_t file_number) const { + for (auto fn : files_grabbed_for_purge_) { + if (file_number == fn) { + return false; + } + } + for (const auto& purge_file_info : purge_queue_) { + if (purge_file_info.number == file_number) { + return false; + } + } + return true; +} + +// MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex +// (mutex_) should already be held. +void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) { + files_grabbed_for_purge_.emplace_back(file_number); +} + void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) { InstrumentedMutexLock l(&mutex_); // snapshot_checker_ should only set once. If we need to set it multiple diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 885b7b8ec5..9837871291 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -12,6 +12,7 @@ #define __STDC_FORMAT_MACROS #endif #include +#include #include "db/event_helpers.h" #include "util/file_util.h" #include "util/sst_file_manager_impl.h" @@ -187,6 +188,13 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, &job_context->manifest_delete_files, job_context->min_pending_output); + // Mark the elements in job_context->sst_delete_files as grabbedForPurge + // so that other threads calling FindObsoleteFiles with full_scan=true + // will not add these files to candidate list for purge. + for (const auto sst_to_del : job_context->sst_delete_files) { + MarkAsGrabbedForPurge(sst_to_del->fd.GetNumber()); + } + // store the current filenum, lognum, etc job_context->manifest_file_number = versions_->manifest_file_number(); job_context->pending_manifest_file_number = @@ -197,6 +205,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, versions_->AddLiveFiles(&job_context->sst_live); if (doing_the_full_scan) { + InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), + dbname_); for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size(); path_id++) { // set of all files in the directory. We'll exclude files that are still @@ -204,7 +214,21 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, std::vector files; env_->GetChildren(immutable_db_options_.db_paths[path_id].path, &files); // Ignore errors - for (std::string file : files) { + for (const std::string& file : files) { + uint64_t number; + FileType type; + // 1. If we cannot parse the file name, we skip; + // 2. If the file with file_number equals number has already been + // grabbed for purge by another compaction job, or it has already been + // schedule for purge, we also skip it if we + // are doing full scan in order to avoid double deletion of the same + // file under race conditions. See + // https://github.com/facebook/rocksdb/issues/3573 + if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) || + !ShouldPurge(number)) { + continue; + } + // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes job_context->full_scan_candidate_files.emplace_back( "/" + file, static_cast(path_id)); @@ -216,7 +240,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, std::vector log_files; env_->GetChildren(immutable_db_options_.wal_dir, &log_files); // Ignore errors - for (std::string log_file : log_files) { + for (const std::string& log_file : log_files) { job_context->full_scan_candidate_files.emplace_back(log_file, 0); } } @@ -318,6 +342,8 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, } else { file_deletion_status = env_->DeleteFile(fname); } + TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion", + &file_deletion_status); if (file_deletion_status.ok()) { ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id, @@ -409,6 +435,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { std::vector old_info_log_files; InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), dbname_); + std::unordered_set files_to_del; for (const auto& candidate_file : candidate_files) { std::string to_delete = candidate_file.file_name; uint32_t path_id = candidate_file.path_id; @@ -437,6 +464,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { // DontDeletePendingOutputs fail keep = (sst_live_map.find(number) != sst_live_map.end()) || number >= state.min_pending_output; + if (!keep) { + files_to_del.insert(number); + } break; case kTempFile: // Any temp files that are currently being written to must @@ -498,6 +528,19 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { } } + { + // After purging obsolete files, remove them from files_grabbed_for_purge_. + // Use a temporary vector to perform bulk deletion via swap. + InstrumentedMutexLock guard_lock(&mutex_); + std::vector tmp; + for (auto fn : files_grabbed_for_purge_) { + if (files_to_del.count(fn) == 0) { + tmp.emplace_back(fn); + } + } + files_grabbed_for_purge_.swap(tmp); + } + // Delete old info log files. size_t old_info_log_file_count = old_info_log_files.size(); if (old_info_log_file_count != 0 && @@ -557,4 +600,5 @@ void DBImpl::DeleteObsoleteFiles() { job_context.Clean(); mutex_.Lock(); } + } // namespace rocksdb diff --git a/db/obsolete_files_test.cc b/db/obsolete_files_test.cc new file mode 100644 index 0000000000..dbee89a644 --- /dev/null +++ b/db/obsolete_files_test.cc @@ -0,0 +1,217 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include +#include "db/db_impl.h" +#include "db/version_set.h" +#include "db/write_batch_internal.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/transaction_log.h" +#include "util/filename.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/testharness.h" +#include "util/testutil.h" + +using std::cerr; +using std::cout; +using std::endl; +using std::flush; + +namespace rocksdb { + +class ObsoleteFilesTest : public testing::Test { + public: + std::string dbname_; + Options options_; + DB* db_; + Env* env_; + int numlevels_; + + ObsoleteFilesTest() { + db_ = nullptr; + env_ = Env::Default(); + // Trigger compaction when the number of level 0 files reaches 2. + options_.level0_file_num_compaction_trigger = 2; + options_.disable_auto_compactions = false; + options_.delete_obsolete_files_period_micros = 0; // always do full purge + options_.enable_thread_tracking = true; + options_.write_buffer_size = 1024*1024*1000; + options_.target_file_size_base = 1024*1024*1000; + options_.max_bytes_for_level_base = 1024*1024*1000; + options_.WAL_ttl_seconds = 300; // Used to test log files + options_.WAL_size_limit_MB = 1024; // Used to test log files + dbname_ = test::TmpDir() + "/double_deletefile_test"; + options_.wal_dir = dbname_ + "/wal_files"; + + // clean up all the files that might have been there before + std::vector old_files; + env_->GetChildren(dbname_, &old_files); + for (auto file : old_files) { + env_->DeleteFile(dbname_ + "/" + file); + } + env_->GetChildren(options_.wal_dir, &old_files); + for (auto file : old_files) { + env_->DeleteFile(options_.wal_dir + "/" + file); + } + + DestroyDB(dbname_, options_); + numlevels_ = 7; + EXPECT_OK(ReopenDB(true)); + } + + Status ReopenDB(bool create) { + delete db_; + if (create) { + DestroyDB(dbname_, options_); + } + db_ = nullptr; + options_.create_if_missing = create; + return DB::Open(options_, dbname_, &db_); + } + + void CloseDB() { + delete db_; + db_ = nullptr; + } + + void AddKeys(int numkeys, int startkey) { + WriteOptions options; + options.sync = false; + for (int i = startkey; i < (numkeys + startkey) ; i++) { + std::string temp = ToString(i); + Slice key(temp); + Slice value(temp); + ASSERT_OK(db_->Put(options, key, value)); + } + } + + int numKeysInLevels( + std::vector &metadata, + std::vector *keysperlevel = nullptr) { + + if (keysperlevel != nullptr) { + keysperlevel->resize(numlevels_); + } + + int numKeys = 0; + for (size_t i = 0; i < metadata.size(); i++) { + int startkey = atoi(metadata[i].smallestkey.c_str()); + int endkey = atoi(metadata[i].largestkey.c_str()); + int numkeysinfile = (endkey - startkey + 1); + numKeys += numkeysinfile; + if (keysperlevel != nullptr) { + (*keysperlevel)[(int)metadata[i].level] += numkeysinfile; + } + fprintf(stderr, "level %d name %s smallest %s largest %s\n", + metadata[i].level, metadata[i].name.c_str(), + metadata[i].smallestkey.c_str(), + metadata[i].largestkey.c_str()); + } + return numKeys; + } + + void createLevel0Files(int numFiles, int numKeysPerFile) { + int startKey = 0; + DBImpl* dbi = reinterpret_cast(db_); + for (int i = 0; i < numFiles; i++) { + AddKeys(numKeysPerFile, startKey); + startKey += numKeysPerFile; + ASSERT_OK(dbi->TEST_FlushMemTable()); + ASSERT_OK(dbi->TEST_WaitForFlushMemTable()); + } + } + + void CheckFileTypeCounts(std::string& dir, + int required_log, + int required_sst, + int required_manifest) { + std::vector filenames; + env_->GetChildren(dir, &filenames); + + int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0; + for (auto file : filenames) { + uint64_t number; + FileType type; + if (ParseFileName(file, &number, &type)) { + log_cnt += (type == kLogFile); + sst_cnt += (type == kTableFile); + manifest_cnt += (type == kDescriptorFile); + } + } + ASSERT_EQ(required_log, log_cnt); + ASSERT_EQ(required_sst, sst_cnt); + ASSERT_EQ(required_manifest, manifest_cnt); + } +}; + +TEST_F(ObsoleteFilesTest, RaceForObsoleteFileDeletion) { + createLevel0Files(2, 50000); + CheckFileTypeCounts(options_.wal_dir, 1, 0, 0); + + SyncPoint::GetInstance()->LoadDependency({ + {"DBImpl::BackgroundCallCompaction:FoundObsoleteFiles", + "ObsoleteFilesTest::RaceForObsoleteFileDeletion:1"}, + {"DBImpl::BackgroundCallCompaction:PurgedObsoleteFiles", + "ObsoleteFilesTest::RaceForObsoleteFileDeletion:2"}, + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DeleteObsoleteFileImpl:AfterDeletion", [&](void* arg) { + Status* p_status = reinterpret_cast(arg); + ASSERT_TRUE(p_status->ok()); + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::CloseHelper:PendingPurgeFinished", [&](void* arg) { + std::vector* files_grabbed_for_purge_ptr = + reinterpret_cast*>(arg); + ASSERT_TRUE(files_grabbed_for_purge_ptr->empty()); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + DBImpl* dbi = reinterpret_cast(db_); + port::Thread user_thread([&]() { + JobContext jobCxt(0); + TEST_SYNC_POINT("ObsoleteFilesTest::RaceForObsoleteFileDeletion:1"); + dbi->TEST_LockMutex(); + dbi->FindObsoleteFiles(&jobCxt, + true /* force=true */, false /* no_full_scan=false */); + dbi->TEST_UnlockMutex(); + TEST_SYNC_POINT("ObsoleteFilesTest::RaceForObsoleteFileDeletion:2"); + dbi->PurgeObsoleteFiles(jobCxt); + jobCxt.Clean(); + }); + + user_thread.join(); + + CloseDB(); +} + +} //namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, + "SKIPPED as DBImpl::DeleteFile is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE diff --git a/src.mk b/src.mk index 4089bf0f33..510ddfdc03 100644 --- a/src.mk +++ b/src.mk @@ -282,6 +282,7 @@ MAIN_SOURCES = \ db/db_write_test.cc \ db/dbformat_test.cc \ db/deletefile_test.cc \ + db/obsolete_files_test.cc \ db/external_sst_file_basic_test.cc \ db/external_sst_file_test.cc \ db/fault_injection_test.cc \