diff --git a/HISTORY.md b/HISTORY.md index 825c1def47..975ece580d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -29,6 +29,7 @@ * Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number. * Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level. * Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family. +* Fix ingested file and directory not being fsync. * Return TryAgain status in place of Corruption when new tail is not visible to TransactionLogIterator. * Fix a bug caused by secondary not skipping the beginning of new MANIFEST. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f3fc96d8d1..e2de696ef5 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -861,16 +861,6 @@ Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const { return ret_dir; } -Directory* DBImpl::Directories::GetDataDir(size_t path_id) const { - assert(path_id < data_dirs_.size()); - Directory* ret_dir = data_dirs_[path_id].get(); - if (ret_dir == nullptr) { - // Should use db_dir_ - return db_dir_.get(); - } - return ret_dir; -} - Status DBImpl::SetOptions( ColumnFamilyHandle* column_family, const std::unordered_map& options_map) { @@ -3644,7 +3634,7 @@ Status DBImpl::IngestExternalFiles( auto* cfd = static_cast(arg.column_family)->cfd(); ingestion_jobs.emplace_back(env_, versions_.get(), cfd, immutable_db_options_, env_options_, - &snapshots_, arg.options); + &snapshots_, arg.options, &directories_); } std::vector> exec_results; for (size_t i = 0; i != num_cfs; ++i) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e6d5a56e24..b5437c4954 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -77,6 +77,38 @@ struct JobContext; struct ExternalSstFileInfo; struct MemTableInfo; +// Class to maintain directories for all database paths other than main one. +class Directories { + public: + Status SetDirectories(Env* env, const std::string& dbname, + const std::string& wal_dir, + const std::vector& data_paths); + + Directory* GetDataDir(size_t path_id) const { + assert(path_id < data_dirs_.size()); + Directory* ret_dir = data_dirs_[path_id].get(); + if (ret_dir == nullptr) { + // Should use db_dir_ + return db_dir_.get(); + } + return ret_dir; + } + + Directory* GetWalDir() { + if (wal_dir_) { + return wal_dir_.get(); + } + return db_dir_.get(); + } + + Directory* GetDbDir() { return db_dir_.get(); } + + private: + std::unique_ptr db_dir_; + std::vector> data_dirs_; + std::unique_ptr wal_dir_; +}; + // While DB is the public interface of RocksDB, and DBImpl is the actual // class implementing it. It's the entrance of the core RocksdB engine. // All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a @@ -1047,30 +1079,6 @@ class DBImpl : public DB { } }; - // Class to maintain directories for all database paths other than main one. - class Directories { - public: - Status SetDirectories(Env* env, const std::string& dbname, - const std::string& wal_dir, - const std::vector& data_paths); - - Directory* GetDataDir(size_t path_id) const; - - Directory* GetWalDir() { - if (wal_dir_) { - return wal_dir_.get(); - } - return db_dir_.get(); - } - - Directory* GetDbDir() { return db_dir_.get(); } - - private: - std::unique_ptr db_dir_; - std::vector> data_dirs_; - std::unique_ptr wal_dir_; - }; - struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) : number(_number) {} void AddSize(uint64_t new_size) { size += new_size; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index eec7cf16aa..13d6959d47 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -265,9 +265,9 @@ Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname, return env->NewDirectory(dirname, directory); } -Status DBImpl::Directories::SetDirectories( - Env* env, const std::string& dbname, const std::string& wal_dir, - const std::vector& data_paths) { +Status Directories::SetDirectories(Env* env, const std::string& dbname, + const std::string& wal_dir, + const std::vector& data_paths) { Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_); if (!s.ok()) { return s; diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 91a422bed9..ff7da502af 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -9,6 +9,7 @@ #include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/sst_file_writer.h" +#include "test_util/fault_injection_test_env.h" #include "test_util/testutil.h" namespace rocksdb { @@ -20,6 +21,7 @@ class ExternalSSTFileBasicTest public: ExternalSSTFileBasicTest() : DBTestBase("/external_sst_file_basic_test") { sst_files_dir_ = dbname_ + "/sst_files/"; + fault_injection_test_env_.reset(new FaultInjectionTestEnv(Env::Default())); DestroyAndRecreateExternalSSTFilesDir(); } @@ -140,6 +142,7 @@ class ExternalSSTFileBasicTest protected: std::string sst_files_dir_; + std::unique_ptr fault_injection_test_env_; }; TEST_F(ExternalSSTFileBasicTest, Basic) { @@ -689,6 +692,59 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(ExternalSSTFileBasicTest, SyncFailure) { + Options options; + options.create_if_missing = true; + options.env = fault_injection_test_env_.get(); + + std::vector> test_cases = { + {"ExternalSstFileIngestionJob::BeforeSyncIngestedFile", + "ExternalSstFileIngestionJob::AfterSyncIngestedFile"}, + {"ExternalSstFileIngestionJob::BeforeSyncDir", + "ExternalSstFileIngestionJob::AfterSyncDir"}, + {"ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno", + "ExternalSstFileIngestionJob::AfterSyncGlobalSeqno"}}; + + for (size_t i = 0; i < test_cases.size(); i++) { + SyncPoint::GetInstance()->SetCallBack(test_cases[i].first, [&](void*) { + fault_injection_test_env_->SetFilesystemActive(false); + }); + SyncPoint::GetInstance()->SetCallBack(test_cases[i].second, [&](void*) { + fault_injection_test_env_->SetFilesystemActive(true); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + if (i == 2) { + ASSERT_OK(Put("foo", "v1")); + } + + Options sst_file_writer_options; + std::unique_ptr sst_file_writer( + new SstFileWriter(EnvOptions(), sst_file_writer_options)); + std::string file_name = + sst_files_dir_ + "sync_failure_test_" + ToString(i) + ".sst"; + ASSERT_OK(sst_file_writer->Open(file_name)); + ASSERT_OK(sst_file_writer->Put("bar", "v2")); + ASSERT_OK(sst_file_writer->Finish()); + + IngestExternalFileOptions ingest_opt; + if (i == 0) { + ingest_opt.move_files = true; + } + const Snapshot* snapshot = db_->GetSnapshot(); + if (i == 2) { + ingest_opt.write_global_seqno = true; + } + ASSERT_FALSE(db_->IngestExternalFile({file_name}, ingest_opt).ok()); + db_->ReleaseSnapshot(snapshot); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Destroy(options); + } +} + TEST_P(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) { int kNumLevels = 7; Options options = CurrentOptions(); diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 7e9657cc90..44b5016856 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -7,11 +7,13 @@ #include "db/external_sst_file_ingestion_job.h" -#include #include +#include #include +#include #include +#include "db/db_impl/db_impl.h" #include "db/version_edit.h" #include "file/file_util.h" #include "table/merging_iterator.h" @@ -86,6 +88,7 @@ Status ExternalSstFileIngestionJob::Prepare( } // Copy/Move external files into DB + std::unordered_set ingestion_path_ids; for (IngestedFileInfo& f : files_to_ingest_) { f.fd = FileDescriptor(next_file_number++, 0, f.file_size); f.copy_file = false; @@ -95,8 +98,26 @@ Status ExternalSstFileIngestionJob::Prepare( f.fd.GetPathId()); if (ingestion_options_.move_files) { status = env_->LinkFile(path_outside_db, path_inside_db); - if (status.IsNotSupported() && - ingestion_options_.failed_move_fall_back_to_copy) { + if (status.ok()) { + // It is unsafe to assume application had sync the file and file + // directory before ingest the file. For integrity of RocksDB we need + // to sync the file. + std::unique_ptr file_to_sync; + status = env_->ReopenWritableFile(path_inside_db, &file_to_sync, + env_options_); + if (status.ok()) { + TEST_SYNC_POINT( + "ExternalSstFileIngestionJob::BeforeSyncIngestedFile"); + status = SyncIngestedFile(file_to_sync.get()); + TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile"); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to sync ingested file %s: %s", + path_inside_db.c_str(), status.ToString().c_str()); + } + } + } else if (status.IsNotSupported() && + ingestion_options_.failed_move_fall_back_to_copy) { // Original file is on a different FS, use copy instead of hard linking. f.copy_file = true; } @@ -107,6 +128,7 @@ Status ExternalSstFileIngestionJob::Prepare( if (f.copy_file) { TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile", nullptr); + // CopyFile also sync the new file. status = CopyFile(env_, path_outside_db, path_inside_db, 0, db_options_.use_fsync); } @@ -115,8 +137,25 @@ Status ExternalSstFileIngestionJob::Prepare( break; } f.internal_file_path = path_inside_db; + ingestion_path_ids.insert(f.fd.GetPathId()); } + TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir"); + if (status.ok()) { + for (auto path_id : ingestion_path_ids) { + status = directories_->GetDataDir(path_id)->Fsync(); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to sync directory %" ROCKSDB_PRIszt + " while ingest file: %s", + path_id, status.ToString().c_str()); + break; + } + } + } + TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir"); + + // TODO: The following is duplicated with Cleanup(). if (!status.ok()) { // We failed, remove all files that we copied into the db for (IngestedFileInfo& f : files_to_ingest_) { @@ -559,6 +598,18 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( std::string seqno_val; PutFixed64(&seqno_val, seqno); status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val); + if (status.ok()) { + TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno"); + status = SyncIngestedFile(rwfile.get()); + TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno"); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to sync ingested file %s after writing global " + "sequence number: %s", + file_to_ingest->internal_file_path.c_str(), + status.ToString().c_str()); + } + } if (!status.ok()) { return status; } @@ -599,6 +650,16 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( return true; } +template +Status ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile* file) { + assert(file != nullptr); + if (db_options_.use_fsync) { + return file->Fsync(); + } else { + return file->Sync(); + } +} + } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index baa8e9f0f6..50f3944054 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -20,6 +20,8 @@ namespace rocksdb { +class Directories; + struct IngestedFileInfo { // External file path std::string external_file_path; @@ -77,7 +79,8 @@ class ExternalSstFileIngestionJob { Env* env, VersionSet* versions, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, const EnvOptions& env_options, SnapshotList* db_snapshots, - const IngestExternalFileOptions& ingestion_options) + const IngestExternalFileOptions& ingestion_options, + Directories* directories) : env_(env), versions_(versions), cfd_(cfd), @@ -85,8 +88,11 @@ class ExternalSstFileIngestionJob { env_options_(env_options), db_snapshots_(db_snapshots), ingestion_options_(ingestion_options), + directories_(directories), job_start_time_(env_->NowMicros()), - consumed_seqno_(false) {} + consumed_seqno_(false) { + assert(directories != nullptr); + } // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, @@ -153,6 +159,10 @@ class ExternalSstFileIngestionJob { bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest, int level); + // Helper method to sync given file. + template + Status SyncIngestedFile(TWritableFile* file); + Env* env_; VersionSet* versions_; ColumnFamilyData* cfd_; @@ -161,6 +171,7 @@ class ExternalSstFileIngestionJob { SnapshotList* db_snapshots_; autovector files_to_ingest_; const IngestExternalFileOptions& ingestion_options_; + Directories* directories_; VersionEdit edit_; uint64_t job_start_time_; bool consumed_seqno_; diff --git a/test_util/fault_injection_test_env.cc b/test_util/fault_injection_test_env.cc index a591ff4b57..5c47b7ea45 100644 --- a/test_util/fault_injection_test_env.cc +++ b/test_util/fault_injection_test_env.cc @@ -98,6 +98,9 @@ Status FileState::DropRandomUnsyncedData(Env* env, Random* rand) const { } Status TestDirectory::Fsync() { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } env_->SyncDir(dirname_); return dir_->Fsync(); } @@ -158,6 +161,53 @@ Status TestWritableFile::Sync() { return Status::OK(); } +TestRandomRWFile::TestRandomRWFile(const std::string& /*fname*/, + std::unique_ptr&& f, + FaultInjectionTestEnv* env) + : target_(std::move(f)), file_opened_(true), env_(env) { + assert(target_ != nullptr); +} + +TestRandomRWFile::~TestRandomRWFile() { + if (file_opened_) { + Close(); + } +} + +Status TestRandomRWFile::Write(uint64_t offset, const Slice& data) { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Write(offset, data); +} + +Status TestRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Read(offset, n, result, scratch); +} + +Status TestRandomRWFile::Close() { + file_opened_ = false; + return target_->Close(); +} + +Status TestRandomRWFile::Flush() { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Flush(); +} + +Status TestRandomRWFile::Sync() { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Sync(); +} + Status FaultInjectionTestEnv::NewDirectory(const std::string& name, std::unique_ptr* result) { std::unique_ptr r; @@ -220,6 +270,27 @@ Status FaultInjectionTestEnv::ReopenWritableFile( return s; } +Status FaultInjectionTestEnv::NewRandomRWFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& soptions) { + if (!IsFilesystemActive()) { + return GetError(); + } + Status s = target()->NewRandomRWFile(fname, result, soptions); + if (s.ok()) { + result->reset(new TestRandomRWFile(fname, std::move(*result), this)); + // WritableFileWriter* file is opened + // again then it will be truncated - so forget our saved state. + UntrackFile(fname); + MutexLock l(&mutex_); + open_files_.insert(fname); + auto dir_and_name = GetDirAndName(fname); + auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; + list.insert(dir_and_name.second); + } + return s; +} + Status FaultInjectionTestEnv::NewRandomAccessFile( const std::string& fname, std::unique_ptr* result, const EnvOptions& soptions) { @@ -238,7 +309,6 @@ Status FaultInjectionTestEnv::DeleteFile(const std::string& f) { fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(), s.ToString().c_str()); } - assert(s.ok()); if (s.ok()) { UntrackFile(f); } diff --git a/test_util/fault_injection_test_env.h b/test_util/fault_injection_test_env.h index d962acfd58..b68b3faedc 100644 --- a/test_util/fault_injection_test_env.h +++ b/test_util/fault_injection_test_env.h @@ -82,6 +82,31 @@ class TestWritableFile : public WritableFile { FaultInjectionTestEnv* env_; }; +// A wrapper around WritableFileWriter* file +// is written to or sync'ed. +class TestRandomRWFile : public RandomRWFile { + public: + explicit TestRandomRWFile(const std::string& fname, + std::unique_ptr&& f, + FaultInjectionTestEnv* env); + virtual ~TestRandomRWFile(); + Status Write(uint64_t offset, const Slice& data) override; + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + Status Close() override; + Status Flush() override; + Status Sync() override; + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + bool use_direct_io() const override { return target_->use_direct_io(); }; + + private: + std::unique_ptr target_; + bool file_opened_; + FaultInjectionTestEnv* env_; +}; + class TestDirectory : public Directory { public: explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname, @@ -114,6 +139,10 @@ class FaultInjectionTestEnv : public EnvWrapper { std::unique_ptr* result, const EnvOptions& soptions) override; + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& soptions) override; + Status NewRandomAccessFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& soptions) override;